topical media & game development

talk show tell print

hush-src-multi-BaseClasses-pullpin.cpp / cpp



  //------------------------------------------------------------------------------
  // File: PullPin.cpp
  //
  // Desc: DirectShow base classes - implements CPullPin class that pulls data
  //       from IAsyncReader.
  //
  // Copyright (c) Microsoft Corporation.  All rights reserved.
  //------------------------------------------------------------------------------
  
  include <streams.h>
  include <pullpin.h>
  
  CPullPin::CPullPin()
    : m_pReader(NULL),
      m_pAlloc(NULL),
      m_State(TM_Exit)
  {
  }
  
  CPullPin::~CPullPin()
  {
      Disconnect();
  }
  
  // returns S_OK if successfully connected to an IAsyncReader interface
  // from this object
  // Optional allocator should be proposed as a preferred allocator if
  // necessary
  HRESULT
  CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
  {
      CAutoLock lock(&m_AccessLock);
  
      if (m_pReader) {
          return VFW_E_ALREADY_CONNECTED;
      }
  
      HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
      if (FAILED(hr)) {
          return(hr);
      }
  
      hr = DecideAllocator(pAlloc, NULL);
      if (FAILED(hr)) {
          Disconnect();
          return hr;
      }
  
      LONGLONG llTotal, llAvail;
      hr = m_pReader->Length(&llTotal, &llAvail);
      if (FAILED(hr)) {
          Disconnect();
          return hr;
      }
  
      // convert from file position to reference time
      m_tDuration = llTotal * UNITS;
      m_tStop = m_tDuration;
      m_tStart = 0;
  
      m_bSync = bSync;
  
      return S_OK;
  }
  
  // disconnect any connection made in Connect
  HRESULT
  CPullPin::Disconnect()
  {
      CAutoLock lock(&m_AccessLock);
  
      StopThread();
  
      if (m_pReader) {
          m_pReader->Release();
          m_pReader = NULL;
      }
  
      if (m_pAlloc) {
          m_pAlloc->Release();
          m_pAlloc = NULL;
      }
  
      return S_OK;
  }
  
  // agree an allocator using RequestAllocator - optional
  // props param specifies your requirements (non-zero fields).
  // returns an error code if fail to match requirements.
  // optional IMemAllocator interface is offered as a preferred allocator
  // but no error occurs if it can't be met.
  HRESULT
  CPullPin::DecideAllocator(
      IMemAllocator * pAlloc,
      ALLOCATOR_PROPERTIES * pProps)
  {
      ALLOCATOR_PROPERTIES *pRequest;
      ALLOCATOR_PROPERTIES Request;
      if (pProps == NULL) {
          Request.cBuffers = 3;
          Request.cbBuffer = 64*1024;
          Request.cbAlign = 0;
          Request.cbPrefix = 0;
          pRequest = &Request;
      } else {
          pRequest = pProps;
      }
      HRESULT hr = m_pReader->RequestAllocator(
                      pAlloc,
                      pRequest,
                      &m_pAlloc);
      return hr;
  }
  
  // start pulling data
  HRESULT
  CPullPin::Active(void)
  {
      ASSERT(!ThreadExists());
      return StartThread();
  }
  
  // stop pulling data
  HRESULT
  CPullPin::Inactive(void)
  {
      StopThread();
  
      return S_OK;
  }
  
  HRESULT
  CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
  {
      CAutoLock lock(&m_AccessLock);
  
      ThreadMsg AtStart = m_State;
  
      if (AtStart == TM_Start) {
          BeginFlush();
          PauseThread();
          EndFlush();
      }
  
      m_tStart = tStart;
      m_tStop = tStop;
  
      HRESULT hr = S_OK;
      if (AtStart == TM_Start) {
          hr = StartThread();
      }
  
      return hr;
  }
  
  HRESULT
  CPullPin::Duration(REFERENCE_TIME* ptDuration)
  {
      *ptDuration = m_tDuration;
      return S_OK;
  }
  
  HRESULT
  CPullPin::StartThread()
  {
      CAutoLock lock(&m_AccessLock);
  
      if (!m_pAlloc || !m_pReader) {
          return E_UNEXPECTED;
      }
  
      HRESULT hr;
      if (!ThreadExists()) {
  
          // commit allocator
          hr = m_pAlloc->Commit();
          if (FAILED(hr)) {
              return hr;
          }
  
          // start thread
          if (!Create()) {
              return E_FAIL;
          }
      }
  
      m_State = TM_Start;
      hr = (HRESULT) CallWorker(m_State);
      return hr;
  }
  
  HRESULT
  CPullPin::PauseThread()
  {
      CAutoLock lock(&m_AccessLock);
  
      if (!ThreadExists()) {
          return E_UNEXPECTED;
      }
  
      // need to flush to ensure the thread is not blocked
      // in WaitForNext
      HRESULT hr = m_pReader->BeginFlush();
      if (FAILED(hr)) {
          return hr;
      }
  
      m_State = TM_Pause;
      hr = CallWorker(TM_Pause);
  
      m_pReader->EndFlush();
      return hr;
  }
  
  HRESULT
  CPullPin::StopThread()
  {
      CAutoLock lock(&m_AccessLock);
  
      if (!ThreadExists()) {
          return S_FALSE;
      }
  
      // need to flush to ensure the thread is not blocked
      // in WaitForNext
      HRESULT hr = m_pReader->BeginFlush();
      if (FAILED(hr)) {
          return hr;
      }
  
      m_State = TM_Exit;
      hr = CallWorker(TM_Exit);
  
      m_pReader->EndFlush();
  
      // wait for thread to completely exit
      Close();
  
      // decommit allocator
      if (m_pAlloc) {
          m_pAlloc->Decommit();
      }
  
      return S_OK;
  }
  
  DWORD
  CPullPin::ThreadProc(void)
  {
      while(1) {
          DWORD cmd = GetRequest();
          switch(cmd) {
          case TM_Exit:
              Reply(S_OK);
              return 0;
  
          case TM_Pause:
              // we are paused already
              Reply(S_OK);
              break;
  
          case TM_Start:
              Reply(S_OK);
              Process();
              break;
          }
  
          // at this point, there should be no outstanding requests on the
          // upstream filter.
          // We should force begin/endflush to ensure that this is true.
          // !!!Note that we may currently be inside a BeginFlush/EndFlush pair
          // on another thread, but the premature EndFlush will do no harm now
          // that we are idle.
          m_pReader->BeginFlush();
          CleanupCancelled();
          m_pReader->EndFlush();
      }
  }
  
  HRESULT
  CPullPin::QueueSample(
      REFERENCE_TIME& tCurrent,
      REFERENCE_TIME tAlignStop,
      BOOL bDiscontinuity
      )
  {
      IMediaSample* pSample;
  
      HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
      if (FAILED(hr)) {
          return hr;
      }
  
      LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
      if (tStopThis > tAlignStop) {
          tStopThis = tAlignStop;
      }
      pSample->SetTime(&tCurrent, &tStopThis);
      tCurrent = tStopThis;
  
      pSample->SetDiscontinuity(bDiscontinuity);
  
      hr = m_pReader->Request(
                          pSample,
                          0);
      if (FAILED(hr)) {
          pSample->Release();
  
          CleanupCancelled();
          OnError(hr);
      }
      return hr;
  }
  
  HRESULT
  CPullPin::CollectAndDeliver(
      REFERENCE_TIME tStart,
      REFERENCE_TIME tStop)
  {
      IMediaSample* pSample = NULL;   // better be sure pSample is set
      DWORD_PTR dwUnused;
      HRESULT hr = m_pReader->WaitForNext(
                          INFINITE,
                          &pSample,
                          &dwUnused);
      if (FAILED(hr)) {
          if (pSample) {
              pSample->Release();
          }
      } else {
          hr = DeliverSample(pSample, tStart, tStop);
      }
      if (FAILED(hr)) {
          CleanupCancelled();
          OnError(hr);
      }
      return hr;
  
  }
  
  HRESULT
  CPullPin::DeliverSample(
      IMediaSample* pSample,
      REFERENCE_TIME tStart,
      REFERENCE_TIME tStop
      )
  {
      // fix up sample if past actual stop (for sector alignment)
      REFERENCE_TIME t1, t2;
      pSample->GetTime(&t1, &t2);
      if (t2 > tStop) {
          t2 = tStop;
      }
  
      // adjust times to be relative to (aligned) start time
      t1 -= tStart;
      t2 -= tStart;
      pSample->SetTime(&t1, &t2);
  
      HRESULT hr = Receive(pSample);
      pSample->Release();
      return hr;
  }
  
  void
  CPullPin::Process(void)
  {
      // is there anything to do?
      if (m_tStop <= m_tStart) {
          EndOfStream();
          return;
      }
  
      BOOL bDiscontinuity = TRUE;
  
      // if there is more than one sample at the allocator,
      // then try to queue 2 at once in order to overlap.
      // -- get buffer count and required alignment
      ALLOCATOR_PROPERTIES Actual;
      HRESULT hr = m_pAlloc->GetProperties(&Actual);
  
      // align the start position downwards
      REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
      REFERENCE_TIME tCurrent = tStart;
  
      REFERENCE_TIME tStop = m_tStop;
      if (tStop > m_tDuration) {
          tStop = m_tDuration;
      }
  
      // align the stop position - may be past stop, but that
      // doesn't matter
      REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
  
      DWORD dwRequest;
  
      if (!m_bSync) {
  
          //  Break out of the loop either if we get to the end or we're asked
          //  to do something else
          while (tCurrent < tAlignStop) {
  
              // Break out without calling EndOfStream if we're asked to
              // do something different
              if (CheckRequest(&dwRequest)) {
                  return;
              }
  
              // queue a first sample
              if (Actual.cBuffers > 1) {
  
                  hr = QueueSample(tCurrent, tAlignStop, TRUE);
                  bDiscontinuity = FALSE;
  
                  if (FAILED(hr)) {
                      return;
                  }
              }
  
              // loop queueing second and waiting for first..
              while (tCurrent < tAlignStop) {
  
                  hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
                  bDiscontinuity = FALSE;
  
                  if (FAILED(hr)) {
                      return;
                  }
  
                  hr = CollectAndDeliver(tStart, tStop);
                  if (S_OK != hr) {
  
                      // stop if error, or if downstream filter said
                      // to stop.
                      return;
                  }
              }
  
              if (Actual.cBuffers > 1) {
                  hr = CollectAndDeliver(tStart, tStop);
                  if (FAILED(hr)) {
                      return;
                  }
              }
          }
      } else {
  
          // sync version of above loop
          while (tCurrent < tAlignStop) {
  
              // Break out without calling EndOfStream if we're asked to
              // do something different
              if (CheckRequest(&dwRequest)) {
                  return;
              }
  
              IMediaSample* pSample;
  
              hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
              if (FAILED(hr)) {
                  OnError(hr);
                  return;
              }
  
              LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
              if (tStopThis > tAlignStop) {
                  tStopThis = tAlignStop;
              }
              pSample->SetTime(&tCurrent, &tStopThis);
              tCurrent = tStopThis;
  
              if (bDiscontinuity) {
                  pSample->SetDiscontinuity(TRUE);
                  bDiscontinuity = FALSE;
              }
  
              hr = m_pReader->SyncReadAligned(pSample);
  
              if (FAILED(hr)) {
                  pSample->Release();
                  OnError(hr);
                  return;
              }
  
              hr = DeliverSample(pSample, tStart, tStop);
              if (hr != S_OK) {
                  if (FAILED(hr)) {
                      OnError(hr);
                  }
                  return;
              }
          }
      }
  
      EndOfStream();
  }
  
  // after a flush, cancelled i/o will be waiting for collection
  // and release
  void
  CPullPin::CleanupCancelled(void)
  {
      while (1) {
          IMediaSample * pSample;
          DWORD_PTR dwUnused;
  
          HRESULT hr = m_pReader->WaitForNext(
                              0,          // no wait
                              &pSample,
                              &dwUnused);
          if(pSample) {
              pSample->Release();
          } else {
              // no more samples
              return;
          }
      }
  }
  


(C) Æliens 20/2/2008

You may not copy or print any of this material without explicit permission of the author or the publisher. In case of other copyright issues, contact the author.