topical media & game development

talk show tell print

hush-src-multi-BaseClasses-outputq.cpp / cpp



  //------------------------------------------------------------------------------
  // File: OutputQ.cpp
  //
  // Desc: DirectShow base classes - implements COutputQueue class used by an
  //       output pin which may sometimes want to queue output samples on a
  //       separate thread and sometimes call Receive() directly on the input
  //       pin.
  //
  // Copyright (c) Microsoft Corporation.  All rights reserved.
  //------------------------------------------------------------------------------
  
  include <streams.h>
  
  //
  //  COutputQueue Constructor :
  //
  //  Determines if a thread is to be created and creates resources
  //
  //     pInputPin  - the downstream input pin we're queueing samples to
  //
  //     phr        - changed to a failure code if this function fails
  //                  (otherwise unchanges)
  //
  //     bAuto      - Ask pInputPin if it can block in Receive by calling
  //                  its ReceiveCanBlock method and create a thread if
  //                  it can block, otherwise not.
  //
  //     bQueue     - if bAuto == FALSE then we create a thread if and only
  //                  if bQueue == TRUE
  //
  //     lBatchSize - work in batches of lBatchSize
  //
  //     bBatchEact - Use exact batch sizes so don't send until the
  //                  batch is full or SendAnyway() is called
  //
  //     lListSize  - If we create a thread make the list of samples queued
  //                  to the thread have this size cache
  //
  //     dwPriority - If we create a thread set its priority to this
  //
  COutputQueue::COutputQueue(
               IPin         *pInputPin,          //  Pin to send stuff to
               HRESULT      *phr,                //  'Return code'
               BOOL          bAuto,              //  Ask pin if queue or not
               BOOL          bQueue,             //  Send through queue
               LONG          lBatchSize,         //  Batch
               BOOL          bBatchExact,        //  Batch exactly to BatchSize
               LONG          lListSize,
               DWORD         dwPriority,
               bool          bFlushingOpt        // flushing optimization
              ) : m_lBatchSize(lBatchSize),
                  m_bBatchExact(bBatchExact && (lBatchSize > 1)),
                  m_hThread(NULL),
                  m_hSem(NULL),
                  m_List(NULL),
                  m_pPin(pInputPin),
                  m_ppSamples(NULL),
                  m_lWaiting(0),
                  m_pInputPin(NULL),
                  m_bSendAnyway(FALSE),
                  m_nBatched(0),
                  m_bFlushing(FALSE),
                  m_bFlushed(TRUE),
                  m_bFlushingOpt(bFlushingOpt),
                  m_bTerminate(FALSE),
                  m_hEventPop(NULL),
                  m_hr(S_OK)
  {
      ASSERT(m_lBatchSize > 0);
  
      if (FAILED(*phr)) {
          return;
      }
  
      //  Check the input pin is OK and cache its IMemInputPin interface
  
      *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
      if (FAILED(*phr)) {
          return;
      }
  
      // See if we should ask the downstream pin
  
      if (bAuto) {
          HRESULT hr = m_pInputPin->ReceiveCanBlock();
          if (SUCCEEDED(hr)) {
              bQueue = hr == S_OK;
          }
      }
  
      //  Create our sample batch
  
      m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
      if (m_ppSamples == NULL) {
          *phr = E_OUTOFMEMORY;
          return;
      }
  
      //  If we're queueing allocate resources
  
      if (bQueue) {
          DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
          m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
          if (m_hSem == NULL) {
              DWORD dwError = GetLastError();
              *phr = AmHresultFromWin32(dwError);
              return;
          }
          m_List = new CSampleList(NAME("Sample Queue List"),
                                   lListSize,
                                   FALSE         // No lock
                                  );
          if (m_List == NULL) {
              *phr = E_OUTOFMEMORY;
              return;
          }
  
          DWORD dwThreadId;
          m_hThread = CreateThread(NULL,
                                   0,
                                   InitialThreadProc,
                                   (LPVOID)this,
                                   0,
                                   &dwThreadId);
          if (m_hThread == NULL) {
              DWORD dwError = GetLastError();
              *phr = AmHresultFromWin32(dwError);
              return;
          }
          SetThreadPriority(m_hThread, dwPriority);
      } else {
          DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
      }
  }
  
  //
  //  COutputQueuee Destructor :
  //
  //  Free all resources -
  //
  //      Thread,
  //      Batched samples
  //
  COutputQueue::~COutputQueue()
  {
      DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
      /*  Free our pointer */
      if (m_pInputPin != NULL) {
          m_pInputPin->Release();
      }
      if (m_hThread != NULL) {
          {
              CAutoLock lck(this);
              m_bTerminate = TRUE;
              m_hr = S_FALSE;
              NotifyThread();
          }
          DbgWaitForSingleObject(m_hThread);
          EXECUTE_ASSERT(CloseHandle(m_hThread));
  
          //  The thread frees the samples when asked to terminate
  
          ASSERT(m_List->GetCount() == 0);
          delete m_List;
      } else {
          FreeSamples();
      }
      if (m_hSem != NULL) {
          EXECUTE_ASSERT(CloseHandle(m_hSem));
      }
      delete [] m_ppSamples;
  }
  
  //
  //  Call the real thread proc as a member function
  //
  DWORD WINAPI COutputQueue::InitialThreadProc(LPVOID pv)
  {
      HRESULT hrCoInit = CAMThread::CoInitializeHelper();
      
      COutputQueue *pSampleQueue = (COutputQueue *)pv;
      DWORD dwReturn = pSampleQueue->ThreadProc();
  
      if(hrCoInit == S_OK) {
          CoUninitialize();
      }
      
      return dwReturn;
  }
  
  //
  //  Thread sending the samples downstream :
  //
  //  When there is nothing to do the thread sets m_lWaiting (while
  //  holding the critical section) and then waits for m_hSem to be
  //  set (not holding the critical section)
  //
  DWORD COutputQueue::ThreadProc()
  {
      while (TRUE) {
          BOOL          bWait = FALSE;
          IMediaSample *pSample;
          LONG          lNumberToSend; // Local copy
          NewSegmentPacket* ppacket;
  
          //
          //  Get a batch of samples and send it if possible
          //  In any case exit the loop if there is a control action
          //  requested
          //
          {
              CAutoLock lck(this);
              while (TRUE) {
  
                  if (m_bTerminate) {
                      FreeSamples();
                      return 0;
                  }
                  if (m_bFlushing) {
                      FreeSamples();
                      SetEvent(m_evFlushComplete);
                  }
  
                  //  Get a sample off the list
  
                  pSample = m_List->RemoveHead();
                  // inform derived class we took something off the queue
                  if (m_hEventPop) {
                      //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                      SetEvent(m_hEventPop);
                  }
  
                  if (pSample != NULL &&
                      !IsSpecialSample(pSample)) {
  
                      //  If its just a regular sample just add it to the batch
                      //  and exit the loop if the batch is full
  
                      m_ppSamples[m_nBatched++] = pSample;
                      if (m_nBatched == m_lBatchSize) {
                          break;
                      }
                  } else {
  
                      //  If there was nothing in the queue and there's nothing
                      //  to send (either because there's nothing or the batch
                      //  isn't full) then prepare to wait
  
                      if (pSample == NULL &&
                          (m_bBatchExact || m_nBatched == 0)) {
  
                          //  Tell other thread to set the event when there's
                          //  something do to
  
                          ASSERT(m_lWaiting == 0);
                          m_lWaiting++;
                          bWait      = TRUE;
                      } else {
  
                          //  We break out of the loop on SEND_PACKET unless
                          //  there's nothing to send
  
                          if (pSample == SEND_PACKET && m_nBatched == 0) {
                              continue;
                          }
  
                          if (pSample == NEW_SEGMENT) {
                              // now we need the parameters - we are
                              // guaranteed that the next packet contains them
                              ppacket = (NewSegmentPacket *) m_List->RemoveHead();
                              // we took something off the queue
                              if (m_hEventPop) {
                                      //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                                      SetEvent(m_hEventPop);
                              }
  
                              ASSERT(ppacket);
                          }
                          //  EOS_PACKET falls through here and we exit the loop
                          //  In this way it acts like SEND_PACKET
                      }
                      break;
                  }
              }
              if (!bWait) {
                  // We look at m_nBatched from the client side so keep
                  // it up to date inside the critical section
                  lNumberToSend = m_nBatched;  // Local copy
                  m_nBatched = 0;
              }
          }
  
          //  Wait for some more data
  
          if (bWait) {
              DbgWaitForSingleObject(m_hSem);
              continue;
          }
  
          //  OK - send it if there's anything to send
          //  We DON'T check m_bBatchExact here because either we've got
          //  a full batch or we dropped through because we got
          //  SEND_PACKET or EOS_PACKET - both of which imply we should
          //  flush our batch
  
          if (lNumberToSend != 0) {
              long nProcessed;
              if (m_hr == S_OK) {
                  ASSERT(!m_bFlushed);
                  HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
                                                            lNumberToSend,
                                                            &nProcessed);
                  /*  Don't overwrite a flushing state HRESULT */
                  CAutoLock lck(this);
                  if (m_hr == S_OK) {
                      m_hr = hr;
                  }
                  ASSERT(!m_bFlushed);
              }
              while (lNumberToSend != 0) {
                  m_ppSamples[--lNumberToSend]->Release();
              }
              if (m_hr != S_OK) {
  
                  //  In any case wait for more data - S_OK just
                  //  means there wasn't an error
  
                  DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
                         m_hr));
              }
          }
  
          //  Check for end of stream
  
          if (pSample == EOS_PACKET) {
  
              //  We don't send even end of stream on if we've previously
              //  returned something other than S_OK
              //  This is because in that case the pin which returned
              //  something other than S_OK should have either sent
              //  EndOfStream() or notified the filter graph
  
              if (m_hr == S_OK) {
                  DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
                  HRESULT hr = m_pPin->EndOfStream();
                  if (FAILED(hr)) {
                      DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
                  }
              }
          }
  
          //  Data from a new source
  
          if (pSample == RESET_PACKET) {
              m_hr = S_OK;
              SetEvent(m_evFlushComplete);
          }
  
          if (pSample == NEW_SEGMENT) {
              m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
              delete ppacket;
          }
      }
  }
  
  //  Send batched stuff anyway
  void COutputQueue::SendAnyway()
  {
      if (!IsQueued()) {
  
          //  m_bSendAnyway is a private parameter checked in ReceiveMultiple
  
          m_bSendAnyway = TRUE;
          LONG nProcessed;
          ReceiveMultiple(NULL, 0, &nProcessed);
          m_bSendAnyway = FALSE;
  
      } else {
          CAutoLock lck(this);
          QueueSample(SEND_PACKET);
          NotifyThread();
      }
  }
  
  void
  COutputQueue::NewSegment(
      REFERENCE_TIME tStart,
      REFERENCE_TIME tStop,
      double dRate)
  {
      if (!IsQueued()) {
          if (S_OK == m_hr) {
              if (m_bBatchExact) {
                  SendAnyway();
              }
              m_pPin->NewSegment(tStart, tStop, dRate);
          }
      } else {
          if (m_hr == S_OK) {
              //
              // we need to queue the new segment to appear in order in the
              // data, but we need to pass parameters to it. Rather than
              // take the hit of wrapping every single sample so we can tell
              // special ones apart, we queue special pointers to indicate
              // special packets, and we guarantee (by holding the
              // critical section) that the packet immediately following a
              // NEW_SEGMENT value is a NewSegmentPacket containing the
              // parameters.
              NewSegmentPacket * ppack = new NewSegmentPacket;
              if (ppack == NULL) {
                  return;
              }
              ppack->tStart = tStart;
              ppack->tStop = tStop;
              ppack->dRate = dRate;
  
              CAutoLock lck(this);
              QueueSample(NEW_SEGMENT);
              QueueSample( (IMediaSample*) ppack);
              NotifyThread();
          }
      }
  }
  
  //
  //  End of Stream is queued to output device
  //
  void COutputQueue::EOS()
  {
      CAutoLock lck(this);
      if (!IsQueued()) {
          if (m_bBatchExact) {
              SendAnyway();
          }
          if (m_hr == S_OK) {
              DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
              m_bFlushed = FALSE;
              HRESULT hr = m_pPin->EndOfStream();
              if (FAILED(hr)) {
                  DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
              }
          }
      } else {
          if (m_hr == S_OK) {
              m_bFlushed = FALSE;
              QueueSample(EOS_PACKET);
              NotifyThread();
          }
      }
  }
  
  //
  //  Flush all the samples in the queue
  //
  void COutputQueue::BeginFlush()
  {
      if (IsQueued()) {
          {
              CAutoLock lck(this);
  
              // block receives -- we assume this is done by the
              // filter in which we are a component
  
              // discard all queued data
  
              m_bFlushing = TRUE;
  
              //  Make sure we discard all samples from now on
  
              if (m_hr == S_OK) {
                  m_hr = S_FALSE;
              }
  
              // Optimize so we don't keep calling downstream all the time
  
              if (m_bFlushed && m_bFlushingOpt) {
                  return;
              }
  
              // Make sure we really wait for the flush to complete
              m_evFlushComplete.Reset();
  
              NotifyThread();
          }
  
          // pass this downstream
  
          m_pPin->BeginFlush();
      } else {
          // pass downstream first to avoid deadlocks
          m_pPin->BeginFlush();
          CAutoLock lck(this);
          // discard all queued data
  
          m_bFlushing = TRUE;
  
          //  Make sure we discard all samples from now on
  
          if (m_hr == S_OK) {
              m_hr = S_FALSE;
          }
      }
  
  }
  
  //
  // leave flush mode - pass this downstream
  void COutputQueue::EndFlush()
  {
      {
          CAutoLock lck(this);
          ASSERT(m_bFlushing);
          if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
              m_bFlushing = FALSE;
              m_hr = S_OK;
              return;
          }
      }
  
      // sync with pushing thread -- done in BeginFlush
      // ensure no more data to go downstream -- done in BeginFlush
      //
      // Because we are synching here there is no need to hold the critical
      // section (in fact we'd deadlock if we did!)
  
      if (IsQueued()) {
          m_evFlushComplete.Wait();
      } else {
          FreeSamples();
      }
  
      //  Be daring - the caller has guaranteed no samples will arrive
      //  before EndFlush() returns
  
      m_bFlushing = FALSE;
      m_bFlushed  = TRUE;
  
      // call EndFlush on downstream pins
  
      m_pPin->EndFlush();
  
      m_hr = S_OK;
  }
  
  //  COutputQueue::QueueSample
  //
  //  private method to Send a sample to the output queue
  //  The critical section MUST be held when this is called
  
  void COutputQueue::QueueSample(IMediaSample *pSample)
  {
      if (NULL == m_List->AddTail(pSample)) {
          if (!IsSpecialSample(pSample)) {
              pSample->Release();
          }
      }
  }
  
  //
  //  COutputQueue::Receive()
  //
  //  Send a single sample by the multiple sample route
  //  (NOTE - this could be optimized if necessary)
  //
  //  On return the sample will have been Release()'d
  //
  
  HRESULT COutputQueue::Receive(IMediaSample *pSample)
  {
      LONG nProcessed;
      return ReceiveMultiple(&pSample, 1, &nProcessed);
  }
  
  //
  //  COutputQueue::ReceiveMultiple()
  //
  //  Send a set of samples to the downstream pin
  //
  //      ppSamples           - array of samples
  //      nSamples            - how many
  //      nSamplesProcessed   - How many were processed
  //
  //  On return all samples will have been Release()'d
  //
  
  HRESULT COutputQueue::ReceiveMultiple (
      IMediaSample **ppSamples,
      long nSamples,
      long *nSamplesProcessed)
  {
      CAutoLock lck(this);
      //  Either call directly or queue up the samples
  
      if (!IsQueued()) {
  
          //  If we already had a bad return code then just return
  
          if (S_OK != m_hr) {
  
              //  If we've never received anything since the last Flush()
              //  and the sticky return code is not S_OK we must be
              //  flushing
              //  ((!A || B) is equivalent to A implies B)
              ASSERT(!m_bFlushed || m_bFlushing);
  
              //  We're supposed to Release() them anyway!
              *nSamplesProcessed = 0;
              for (int i = 0; i < nSamples; i++) {
                  DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding \%d samples code 0x%8.8X"),
                          nSamples, m_hr));
                  ppSamples[i]->Release();
              }
  
              return m_hr;
          }
          //
          //  If we're flushing the sticky return code should be S_FALSE
          //
          ASSERT(!m_bFlushing);
          m_bFlushed = FALSE;
  
          ASSERT(m_nBatched < m_lBatchSize);
          ASSERT(m_nBatched == 0 || m_bBatchExact);
  
          //  Loop processing the samples in batches
  
          LONG iLost = 0;
                  long iDone;
          for ( iDone = 0;
               iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
              ) {
  
  //pragma message (REMIND("Implement threshold scheme"))
              ASSERT(m_nBatched < m_lBatchSize);
              if (iDone < nSamples) {
                  m_ppSamples[m_nBatched++] = ppSamples[iDone++];
              }
              if (m_nBatched == m_lBatchSize ||
                  nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
                  LONG nDone;
                  DbgLog((LOG_TRACE, 4, TEXT("Batching \%d samples"),
                         m_nBatched));
  
                  if (m_hr == S_OK) {
                      m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
                                                          m_nBatched,
                                                          &nDone);
                  } else {
                      nDone = 0;
                  }
                  iLost += m_nBatched - nDone;
                  for (LONG i = 0; i < m_nBatched; i++) {
                      m_ppSamples[i]->Release();
                  }
                  m_nBatched = 0;
              }
          }
          *nSamplesProcessed = iDone - iLost;
          if (*nSamplesProcessed < 0) {
              *nSamplesProcessed = 0;
          }
          return m_hr;
      } else {
          /*  We're sending to our thread */
  
          if (m_hr != S_OK) {
              *nSamplesProcessed = 0;
              DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding \%d samples code 0x%8.8X"),
                      nSamples, m_hr));
              for (int i = 0; i < nSamples; i++) {
                  ppSamples[i]->Release();
              }
              return m_hr;
          }
          m_bFlushed = FALSE;
          for (long i = 0; i < nSamples; i++) {
              QueueSample(ppSamples[i]);
          }
          *nSamplesProcessed = nSamples;
          if (!m_bBatchExact ||
              m_nBatched + m_List->GetCount() >= m_lBatchSize) {
              NotifyThread();
          }
          return S_OK;
      }
  }
  
  //  Get ready for new data - cancels sticky m_hr
  void COutputQueue::Reset()
  {
      if (!IsQueued()) {
          m_hr = S_OK;
      } else {
          CAutoLock lck(this);
          QueueSample(RESET_PACKET);
          NotifyThread();
          m_evFlushComplete.Wait();
      }
  }
  
  //  Remove and Release() all queued and Batched samples
  void COutputQueue::FreeSamples()
  {
      CAutoLock lck(this);
      if (IsQueued()) {
          while (TRUE) {
              IMediaSample *pSample = m_List->RemoveHead();
              // inform derived class we took something off the queue
              if (m_hEventPop) {
                  //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                  SetEvent(m_hEventPop);
              }
  
              if (pSample == NULL) {
                  break;
              }
              if (!IsSpecialSample(pSample)) {
                  pSample->Release();
              } else {
                  if (pSample == NEW_SEGMENT) {
                      //  Free NEW_SEGMENT packet
                      NewSegmentPacket *ppacket =
                          (NewSegmentPacket *) m_List->RemoveHead();
                      // inform derived class we took something off the queue
                      if (m_hEventPop) {
                          //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
                          SetEvent(m_hEventPop);
                      }
  
                      ASSERT(ppacket != NULL);
                      delete ppacket;
                  }
              }
          }
      }
      for (int i = 0; i < m_nBatched; i++) {
          m_ppSamples[i]->Release();
      }
      m_nBatched = 0;
  }
  
  //  Notify the thread if there is something to do
  //
  //  The critical section MUST be held when this is called
  void COutputQueue::NotifyThread()
  {
      //  Optimize - no need to signal if it's not waiting
      ASSERT(IsQueued());
      if (m_lWaiting) {
          ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
          m_lWaiting = 0;
      }
  }
  
  //  See if there's any work to do
  //  Returns
  //      TRUE  if there is nothing on the queue and nothing in the batch
  //            and all data has been sent
  //      FALSE otherwise
  //
  BOOL COutputQueue::IsIdle()
  {
      CAutoLock lck(this);
  
      //  We're idle if
      //      there is no thread (!IsQueued()) OR
      //      the thread is waiting for more work  (m_lWaiting != 0)
      //  AND
      //      there's nothing in the current batch (m_nBatched == 0)
  
      if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
          return FALSE;
      } else {
  
          //  If we're idle it shouldn't be possible for there
          //  to be anything on the work queue
  
          ASSERT(!IsQueued() || m_List->GetCount() == 0);
          return TRUE;
      }
  }
  
  void COutputQueue::SetPopEvent(HANDLE hEvent)
  {
      m_hEventPop = hEvent;
  }
  


(C) Æliens 20/2/2008

You may not copy or print any of this material without explicit permission of the author or the publisher. In case of other copyright issues, contact the author.