OMX组件的标准接口(OMX_Core.h
)中,OMX_FillThisBuffer
和OMX_EmptyThisBuffer
共同完成了OMX的buffer运转。OMX_FillThisBuffer操作解码后的数据(PCM/YUV)的,OMX_EmptyThisBuffer操作解码前(es)数据。本博客将进行分析OMX_FillThisBuffer,下一个博客再分析OMX_EmptyThisBuffer。 整个buffer机制分析需要从,这两个过程的结合实际上是生产者和消费者之间的关系。OMX_FillThisBuffer而言,MediaCodec是消费者,OMX组件是生产者。 之前讲OMX在新闻机制中,我说过OMXNodeInstance实例化一个OMX_CALLBACKTYPE
指向三个函数的结构体:
OMX_CALLBACKTYPE OMXNodeInstance::kCallbacks = {
&OnEvent, &OnEmptyBufferDone, &OnFillBufferDone };
OMX_CALLBACKTYPE
就是OMX_Core.h标准接口:
typedef struct OMX_CALLBACKTYPE {
OMX_ERRORTYPE (*EventHandler)( OMX_IN OMX_HANDLETYPE hComponent, OMX_IN OMX_PTR pAppData, OMX_IN OMX_EVENTTYPE eEvent, OMX_IN OMX_U32 nData1, OMX_IN OMX_U32 nData2, OMX_IN OMX_PTR pEventData); OMX_ERRORTYPE (*EmptyBufferDone)( OMX_IN OMX_HANDLETYPE hComponent, OMX_IN OMX_PTR pAppData, OMX_IN OMX_BUFFERHEADERTYPE* pBuffer); OMX_ERRORTYPE (*FillBufferDone)( OMX_OUT OMX_HANDLETYPE hComponent, OMX_OUT OMX_PTR pAppData, OMX_OUT OMX_BUFFERHEADERTYPE* pBuffer); } OMX_CALLBACKTYPE;
EmptyBufferDone
和FillBufferDone
就是OMX通知给ACodec回调接口。
ACodecBufferChannel
这个类是供ACodec用于更新buffer信息,类的实例操作是MediaCodec的init
完成函数:
status_t MediaCodec::init(const AString &name) {
... /* 1.实例化ACodecBufferChannel */ mBufferChannel = mCodec->getBufferChannel(); /* 2.设置ACodecBufferChannel的回调函数 */ mBufferChannel->setCallback(
std::unique_ptr<CodecBase::BufferCallback>(
new BufferCallback(new AMessage(kWhatCodecNotify, this))));
...
}
看一下ACodec中getBufferChannel
函数的操作:
std::shared_ptr<BufferChannelBase> ACodec::getBufferChannel() {
if (!mBufferChannel) {
mBufferChannel = std::make_shared<ACodecBufferChannel>(
new AMessage(kWhatInputBufferFilled, this),
new AMessage(kWhatOutputBufferDrained, this));
}
return mBufferChannel;
}
构造函数中实例化了两个msg,分别和OMX组件的标准回调消息EmptyBufferDone
及FillBufferDone
配合使用。 回到前面看回调的设置,当ACodecBufferChannel实例化完成后,MediaCodec为了能和ACodec同步获知OMX对buffer的更新情况,会设置一个BufferCallback
(主要是一个msg)到ACodecBufferChannel中:
BufferCallback::BufferCallback(const sp<AMessage> ¬ify)
: mNotify(notify) {
}
notify就是上面的kWhatCodecNotify
,有了这个msg之后,ACodecBufferChannel中的消息就可以由MediaCodec来处理了。
不论是vdec组件还是adec组件,当某个buffer的数据填充完之后,OMX组件会调用callbacks.FillBufferDone
通知至上层,下面看下芯片平台的具体实现:
pMSComponent->callbacks.FillBufferDone(pOMXComponent, pMSComponent->callbackData, bufferHeader);
参数中bufferHeader
包括了解码完后buffer的所有信息。下面看下OMXNodeInstance
中OnFillBufferDone
的实现:
OMX_ERRORTYPE OMXNodeInstance::OnFillBufferDone(
OMX_IN OMX_HANDLETYPE /* hComponent */,
OMX_IN OMX_PTR pAppData,
OMX_IN OMX_BUFFERHEADERTYPE* pBuffer) {
if (pAppData == NULL) {
ALOGE("b/25884056");
return OMX_ErrorBadParameter;
}
OMXNodeInstance *instance = static_cast<OMXNodeInstance *>(pAppData);
if (instance->mDying) {
return OMX_ErrorNone;
}
int fenceFd = instance->retrieveFenceFromMeta_l(pBuffer, kPortIndexOutput);
omx_message msg;
msg.type = omx_message::FILL_BUFFER_DONE;
msg.fenceFd = fenceFd;
msg.u.extended_buffer_data.buffer = instance->findBufferID(pBuffer);
msg.u.extended_buffer_data.range_offset = pBuffer->nOffset;
msg.u.extended_buffer_data.range_length = pBuffer->nFilledLen;
msg.u.extended_buffer_data.flags = pBuffer->nFlags;
msg.u.extended_buffer_data.timestamp = pBuffer->nTimeStamp;
instance->mDispatcher->post(msg);
return OMX_ErrorNone;
}
OnFillBufferDone的函数实现比较简单,就是将buffer的相关信息填充到omx_message
结构体中并发送出去。下面看下omx_message::FILL_BUFFER_DONE
的消息处理:
void OMXNodeInstance::onMessages(std::list<omx_message> &messages) {
for (std::list<omx_message>::iterator it = messages.begin(); it != messages.end(); ) {
/* OMXNodeInstance的消息处理 */
if (handleMessage(*it)) {
messages.erase(it++);
} else {
++it;
}
}
/* ACodec的消息处理 */
if (!messages.empty()) {
mObserver->onMessages(messages);
}
}
先看handleMessage
:
if (msg.type == omx_message::FILL_BUFFER_DONE) {
/* 1.获取buffer信息 */
OMX_BUFFERHEADERTYPE *buffer =
findBufferHeader(msg.u.extended_buffer_data.buffer, kPortIndexOutput);
if (buffer == NULL) {
ALOGE("b/25884056");
return false;
}
{
Mutex::Autolock _l(mDebugLock);
mOutputBuffersWithCodec.remove(buffer);
CLOG_BUMPED_BUFFER(
FBD, WITH_STATS(FULL_BUFFER(
msg.u.extended_buffer_data.buffer, buffer, msg.fenceFd)));
unbumpDebugLevel_l(kPortIndexOutput);
}
BufferMeta *buffer_meta =
static_cast<BufferMeta *>(buffer->pAppPrivate);
if (buffer->nOffset + buffer->nFilledLen < buffer->nOffset
|| buffer->nOffset + buffer->nFilledLen > buffer->nAllocLen) {
CLOG_ERROR(onFillBufferDone, OMX_ErrorBadParameter,
FULL_BUFFER(NULL, buffer, msg.fenceFd));
}
/* 2.将OMX的buffer拷贝到framework层共享buffer中 */
buffer_meta->CopyFromOMX(buffer);
// fix up the buffer info (especially timestamp) if needed
codecBufferFilled(msg);
}
之前分析消息机制的时候,已经说过,这里的mObserver就是ACodec传递下来的,中间的消息传递过程就不再赘述:
case omx_message::FILL_BUFFER_DONE:
{
IOMX::buffer_id bufferID;
CHECK(msg->findInt32("buffer", (int32_t*)&bufferID));
int32_t rangeOffset, rangeLength, flags, fenceFd;
int64_t timeUs;
CHECK(msg->findInt32("range_offset", &rangeOffset));
CHECK(msg->findInt32("range_length", &rangeLength));
CHECK(msg->findInt32("flags", &flags));
CHECK(msg->findInt64("timestamp", &timeUs));
CHECK(msg->findInt32("fence_fd", &fenceFd));
return onOMXFillBufferDone(
bufferID,
(size_t)rangeOffset, (size_t)rangeLength,
(OMX_U32)flags,
timeUs,
fenceFd);
}
继续看BaseState::onOMXFillBufferDone
:
bool ACodec::BaseState::onOMXFillBufferDone(
IOMX::buffer_id bufferID,
size_t rangeOffset, size_t rangeLength,
OMX_U32 flags,
int64_t timeUs,
int fenceFd) {
...
PortMode mode = getPortMode(kPortIndexOutput);
switch (mode) {
...
case RESUBMIT_BUFFERS:
{
...
/* 更新buffer池 */
mCodec->mBufferChannel->drainThisBuffer(info->mBufferID, flags);
...
}
...
}
...
}
这里就需要用到ACodecBufferChannel
了,看一下drainThisBuffer
的实现:
void ACodecBufferChannel::drainThisBuffer(
IOMX::buffer_id bufferId,
OMX_U32 omxFlags) {
ALOGV("drainThisBuffer #%d", bufferId);
std::shared_ptr<const std::vector<const BufferInfo>> array(
std::atomic_load(&mOutputBuffers));
BufferInfoIterator it = findBufferId(array, bufferId);
if (it == array->end()) {
ALOGE("drainThisBuffer: unrecognized buffer #%d", bufferId);
return;
}
if (it->mClientBuffer != it->mCodecBuffer) {
it->mClientBuffer->setFormat(it->mCodecBuffer->format());
}
uint32_t flags = 0;
if (omxFlags & OMX_BUFFERFLAG_SYNCFRAME) {
flags |= MediaCodec::BUFFER_FLAG_SYNCFRAME;
}
if (omxFlags & OMX_BUFFERFLAG_CODECCONFIG) {
flags |= MediaCodec::BUFFER_FLAG_CODECCONFIG;
}
if (omxFlags & OMX_BUFFERFLAG_EOS) {
flags |= MediaCodec::BUFFER_FLAG_EOS;
}
it->mClientBuffer->meta()->setInt32("flags", flags);
/* 调用BufferCallback的BufferCallback函数 */
mCallback->onOutputBufferAvailable(
std::distance(array->begin(), it),
it->mClientBuffer);
}
可以看到该函数的重点是回调通知到onOutputBufferAvailable
:
void BufferCallback::onOutputBufferAvailable(
size_t index, const sp<MediaCodecBuffer> &buffer) {
sp<AMessage> notify(mNotify->dup());
notify->setInt32("what", kWhatDrainThisBuffer);
notify->setSize("index", index);
notify->setObject("buffer", buffer);
notify->post();
}
消息处理中,最重要的就是下面这句:
case kWhatDrainThisBuffer:
{
/* size_t index = */updateBuffers(kPortIndexOutput, msg);
...
}
updateBuffers
实现:
size_t MediaCodec::updateBuffers(
int32_t portIndex, const sp<AMessage> &msg) {
CHECK(portIndex == kPortIndexInput || portIndex == kPortIndexOutput);
size_t index;
CHECK(msg->findSize("index", &index));
sp<RefBase> obj;
CHECK(msg->findObject("buffer", &obj));
sp<MediaCodecBuffer> buffer = static_cast<MediaCodecBuffer *>(obj.get());
{
Mutex::Autolock al(mBufferLock);
if (mPortBuffers[portIndex].size() <= index) {
mPortBuffers[portIndex].resize(align(index + 1, kNumBuffersAlign));
}
mPortBuffers[portIndex][index].mData = buffer;
}
/* 将可用buffer的index入队 */
mAvailPortBuffers[portIndex].push_back(index);
return index;
}
mAvailPortBuffers
这个队列存储了目前可用的buffer数量,实际上OMX组件与MediaCodec的最终交互就体现在这个队列中。到这里,整个OMX->MediaCodec
的buffer通知机制就分析完了,总结来说,就是当底层OMX组件有解码完后的数据时,会将buffer信息层层回调,通知至MediaCodec并更新到可用buffer队列中,进而供MediaCodec查询使用。
在上一个章节中讲到当OMX底层完成了解码并将buffer信息更新到mAvailPortBuffers
后,此时,上层应用就可以使用这个buffer中的数据进行播放或者送显,而首先需要做的,就是调用dequeueOutputBuffer
获得可用buffer的index:
status_t MediaCodec::dequeueOutputBuffer(
size_t *index,
size_t *offset,
size_t *size,
int64_t *presentationTimeUs,
uint32_t *flags,
int64_t timeoutUs) {
...
/* 发送消息查询index */
sp<AMessage> msg = new AMessage(kWhatDequeueOutputBuffer, this);
msg->setInt64("timeoutUs", timeoutUs);
sp<AMessage> response;
status_t err;
if ((err = PostAndAwaitResponse(msg, &response)) != OK) {
return err;
}
CHECK(response->findSize("index", index));
CHECK(response->findSize("offset", offset));
CHECK(response->findSize("size", size));
CHECK(response->findInt64("timeUs", presentationTimeUs));
CHECK(response->findInt32("flags", (int32_t *)flags));
...
return OK;
}
看一下kWhatDequeueOutputBuffer
的做法:
case kWhatDequeueOutputBuffer:
{
...
/* 获取输出index */
if (handleDequeueOutputBuffer(replyID, true /* new request */)) {
break;
}
...
}
可以看到函数是通过handleDequeueOutputBuffer
拿到index的:
bool MediaCodec::handleDequeueOutputBuffer(const sp<AReplyToken> &replyID, bool newRequest) {
...
else {
sp<AMessage> response = new AMessage;
ssize_t index = dequeuePortBuffer(kPortIndexOutput);
if (index < 0) {
CHECK_EQ(index, -EAGAIN);
return false;
}
const sp<MediaCodecBuffer> &buffer =
mPortBuffers[kPortIndexOutput][index].mData;
response->setSize("index", index);
response->setSize("offset", buffer->offset());
response->setSize("size", buffer->size());
int64_t timeUs;
CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
statsBufferReceived(timeUs);
response->setInt64("timeUs", timeUs);
int32_t flags;
CHECK(buffer->meta()->findInt32("flags", &flags));
response->setInt32("flags", flags);
response->postReply(replyID);
}
return true;
}
再次跟进到dequeuePortBuffer
:
ssize_t MediaCodec::dequeuePortBuffer(int32_t portIndex) {
CHECK(portIndex == kPortIndexInput || portIndex == kPortIndexOutput);
/* 1.获取buffer队列的临时指针对象 */
List<size_t> *availBuffers = &mAvailPortBuffers[portIndex];
if (availBuffers->empty()) {
return -EAGAIN;
}
/* 2.获得队首index */
size_t index = *availBuffers->begin();
/* 3.将此index从队列中删除 */
availBuffers->erase(availBuffers->begin());
/* 4.通过index拿到对应的buffer信息并填充 */
BufferInfo *info = &mPortBuffers[portIndex][index];
CHECK(!info->mOwnedByClient);
{