pullpin.cpp 11 KB


  1. //------------------------------------------------------------------------------
  2. // File: PullPin.cpp
  3. //
  4. // Desc: DirectShow base classes - implements CPullPin class that pulls data
  5. // from IAsyncReader.
  6. //
  7. // Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved.
  8. //------------------------------------------------------------------------------
  9. #include <streams.h>
  10. #include "pullpin.h"
  11. #ifdef DXMPERF
  12. #include "dxmperf.h"
  13. #endif // DXMPERF
  14. CPullPin::CPullPin()
  15. : m_pReader(NULL),
  16. m_pAlloc(NULL),
  17. m_State(TM_Exit)
  18. {
  19. #ifdef DXMPERF
  20. PERFLOG_CTOR( L"CPullPin", this );
  21. #endif // DXMPERF
  22. }
  23. CPullPin::~CPullPin()
  24. {
  25. Disconnect();
  26. #ifdef DXMPERF
  27. PERFLOG_DTOR( L"CPullPin", this );
  28. #endif // DXMPERF
  29. }
  30. // returns S_OK if successfully connected to an IAsyncReader interface
  31. // from this object
  32. // Optional allocator should be proposed as a preferred allocator if
  33. // necessary
  34. HRESULT
  35. CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
  36. {
  37. CAutoLock lock(&m_AccessLock);
  38. if (m_pReader) {
  39. return VFW_E_ALREADY_CONNECTED;
  40. }
  41. HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
  42. if (FAILED(hr)) {
  43. #ifdef DXMPERF
  44. {
  45. AM_MEDIA_TYPE * pmt = NULL;
  46. PERFLOG_CONNECT( this, pUnk, hr, pmt );
  47. }
  48. #endif // DXMPERF
  49. return(hr);
  50. }
  51. hr = DecideAllocator(pAlloc, NULL);
  52. if (FAILED(hr)) {
  53. Disconnect();
  54. #ifdef DXMPERF
  55. {
  56. AM_MEDIA_TYPE * pmt = NULL;
  57. PERFLOG_CONNECT( this, pUnk, hr, pmt );
  58. }
  59. #endif // DXMPERF
  60. return hr;
  61. }
  62. LONGLONG llTotal, llAvail;
  63. hr = m_pReader->Length(&llTotal, &llAvail);
  64. if (FAILED(hr)) {
  65. Disconnect();
  66. #ifdef DXMPERF
  67. {
  68. AM_MEDIA_TYPE * pmt = NULL;
  69. PERFLOG_CONNECT( this, pUnk, hr, pmt );
  70. }
  71. #endif
  72. return hr;
  73. }
  74. // convert from file position to reference time
  75. m_tDuration = llTotal * UNITS;
  76. m_tStop = m_tDuration;
  77. m_tStart = 0;
  78. m_bSync = bSync;
  79. #ifdef DXMPERF
  80. {
  81. AM_MEDIA_TYPE * pmt = NULL;
  82. PERFLOG_CONNECT( this, pUnk, S_OK, pmt );
  83. }
  84. #endif // DXMPERF
  85. return S_OK;
  86. }
  87. // disconnect any connection made in Connect
  88. HRESULT
  89. CPullPin::Disconnect()
  90. {
  91. CAutoLock lock(&m_AccessLock);
  92. StopThread();
  93. #ifdef DXMPERF
  94. PERFLOG_DISCONNECT( this, m_pReader, S_OK );
  95. #endif // DXMPERF
  96. if (m_pReader) {
  97. m_pReader->Release();
  98. m_pReader = NULL;
  99. }
  100. if (m_pAlloc) {
  101. m_pAlloc->Release();
  102. m_pAlloc = NULL;
  103. }
  104. return S_OK;
  105. }
  106. // agree an allocator using RequestAllocator - optional
  107. // props param specifies your requirements (non-zero fields).
  108. // returns an error code if fail to match requirements.
  109. // optional IMemAllocator interface is offered as a preferred allocator
  110. // but no error occurs if it can't be met.
  111. HRESULT
  112. CPullPin::DecideAllocator(
  113. IMemAllocator * pAlloc,
  114. __inout_opt ALLOCATOR_PROPERTIES * pProps)
  115. {
  116. ALLOCATOR_PROPERTIES *pRequest;
  117. ALLOCATOR_PROPERTIES Request;
  118. if (pProps == NULL) {
  119. Request.cBuffers = 3;
  120. Request.cbBuffer = 64*1024;
  121. Request.cbAlign = 0;
  122. Request.cbPrefix = 0;
  123. pRequest = &Request;
  124. } else {
  125. pRequest = pProps;
  126. }
  127. HRESULT hr = m_pReader->RequestAllocator(
  128. pAlloc,
  129. pRequest,
  130. &m_pAlloc);
  131. return hr;
  132. }
  133. // start pulling data
  134. HRESULT
  135. CPullPin::Active(void)
  136. {
  137. ASSERT(!ThreadExists());
  138. return StartThread();
  139. }
  140. // stop pulling data
  141. HRESULT
  142. CPullPin::Inactive(void)
  143. {
  144. StopThread();
  145. return S_OK;
  146. }
  147. HRESULT
  148. CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
  149. {
  150. CAutoLock lock(&m_AccessLock);
  151. ThreadMsg AtStart = m_State;
  152. if (AtStart == TM_Start) {
  153. BeginFlush();
  154. PauseThread();
  155. EndFlush();
  156. }
  157. m_tStart = tStart;
  158. m_tStop = tStop;
  159. HRESULT hr = S_OK;
  160. if (AtStart == TM_Start) {
  161. hr = StartThread();
  162. }
  163. return hr;
  164. }
  165. HRESULT
  166. CPullPin::Duration(__out REFERENCE_TIME* ptDuration)
  167. {
  168. *ptDuration = m_tDuration;
  169. return S_OK;
  170. }
  171. HRESULT
  172. CPullPin::StartThread()
  173. {
  174. CAutoLock lock(&m_AccessLock);
  175. if (!m_pAlloc || !m_pReader) {
  176. return E_UNEXPECTED;
  177. }
  178. HRESULT hr;
  179. if (!ThreadExists()) {
  180. // commit allocator
  181. hr = m_pAlloc->Commit();
  182. if (FAILED(hr)) {
  183. return hr;
  184. }
  185. // start thread
  186. if (!Create()) {
  187. return E_FAIL;
  188. }
  189. }
  190. m_State = TM_Start;
  191. hr = (HRESULT) CallWorker(m_State);
  192. return hr;
  193. }
  194. HRESULT
  195. CPullPin::PauseThread()
  196. {
  197. CAutoLock lock(&m_AccessLock);
  198. if (!ThreadExists()) {
  199. return E_UNEXPECTED;
  200. }
  201. // need to flush to ensure the thread is not blocked
  202. // in WaitForNext
  203. HRESULT hr = m_pReader->BeginFlush();
  204. if (FAILED(hr)) {
  205. return hr;
  206. }
  207. m_State = TM_Pause;
  208. hr = CallWorker(TM_Pause);
  209. m_pReader->EndFlush();
  210. return hr;
  211. }
  212. HRESULT
  213. CPullPin::StopThread()
  214. {
  215. CAutoLock lock(&m_AccessLock);
  216. if (!ThreadExists()) {
  217. return S_FALSE;
  218. }
  219. // need to flush to ensure the thread is not blocked
  220. // in WaitForNext
  221. HRESULT hr = m_pReader->BeginFlush();
  222. if (FAILED(hr)) {
  223. return hr;
  224. }
  225. m_State = TM_Exit;
  226. hr = CallWorker(TM_Exit);
  227. m_pReader->EndFlush();
  228. // wait for thread to completely exit
  229. Close();
  230. // decommit allocator
  231. if (m_pAlloc) {
  232. m_pAlloc->Decommit();
  233. }
  234. return S_OK;
  235. }
  236. DWORD
  237. CPullPin::ThreadProc(void)
  238. {
  239. while(1) {
  240. DWORD cmd = GetRequest();
  241. switch(cmd) {
  242. case TM_Exit:
  243. Reply(S_OK);
  244. return 0;
  245. case TM_Pause:
  246. // we are paused already
  247. Reply(S_OK);
  248. break;
  249. case TM_Start:
  250. Reply(S_OK);
  251. Process();
  252. break;
  253. }
  254. // at this point, there should be no outstanding requests on the
  255. // upstream filter.
  256. // We should force begin/endflush to ensure that this is true.
  257. // !!!Note that we may currently be inside a BeginFlush/EndFlush pair
  258. // on another thread, but the premature EndFlush will do no harm now
  259. // that we are idle.
  260. m_pReader->BeginFlush();
  261. CleanupCancelled();
  262. m_pReader->EndFlush();
  263. }
  264. }
  265. HRESULT
  266. CPullPin::QueueSample(
  267. __inout REFERENCE_TIME& tCurrent,
  268. REFERENCE_TIME tAlignStop,
  269. BOOL bDiscontinuity
  270. )
  271. {
  272. IMediaSample* pSample;
  273. HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
  274. if (FAILED(hr)) {
  275. return hr;
  276. }
  277. LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
  278. if (tStopThis > tAlignStop) {
  279. tStopThis = tAlignStop;
  280. }
  281. pSample->SetTime(&tCurrent, &tStopThis);
  282. tCurrent = tStopThis;
  283. pSample->SetDiscontinuity(bDiscontinuity);
  284. hr = m_pReader->Request(
  285. pSample,
  286. 0);
  287. if (FAILED(hr)) {
  288. pSample->Release();
  289. CleanupCancelled();
  290. OnError(hr);
  291. }
  292. return hr;
  293. }
  294. HRESULT
  295. CPullPin::CollectAndDeliver(
  296. REFERENCE_TIME tStart,
  297. REFERENCE_TIME tStop)
  298. {
  299. IMediaSample* pSample = NULL; // better be sure pSample is set
  300. DWORD_PTR dwUnused;
  301. HRESULT hr = m_pReader->WaitForNext(
  302. INFINITE,
  303. &pSample,
  304. &dwUnused);
  305. if (FAILED(hr)) {
  306. if (pSample) {
  307. pSample->Release();
  308. }
  309. } else {
  310. hr = DeliverSample(pSample, tStart, tStop);
  311. }
  312. if (FAILED(hr)) {
  313. CleanupCancelled();
  314. OnError(hr);
  315. }
  316. return hr;
  317. }
  318. HRESULT
  319. CPullPin::DeliverSample(
  320. IMediaSample* pSample,
  321. REFERENCE_TIME tStart,
  322. REFERENCE_TIME tStop
  323. )
  324. {
  325. // fix up sample if past actual stop (for sector alignment)
  326. REFERENCE_TIME t1, t2;
  327. if (S_OK == pSample->GetTime(&t1, &t2)) {
  328. if (t2 > tStop) {
  329. t2 = tStop;
  330. }
  331. // adjust times to be relative to (aligned) start time
  332. t1 -= tStart;
  333. t2 -= tStart;
  334. HRESULT hr = pSample->SetTime(&t1, &t2);
  335. if (FAILED(hr)) {
  336. return hr;
  337. }
  338. }
  339. #ifdef DXMPERF
  340. {
  341. AM_MEDIA_TYPE * pmt = NULL;
  342. pSample->GetMediaType( &pmt );
  343. PERFLOG_RECEIVE( L"CPullPin", m_pReader, this, pSample, pmt );
  344. }
  345. #endif
  346. HRESULT hr = Receive(pSample);
  347. pSample->Release();
  348. return hr;
  349. }
  350. void
  351. CPullPin::Process(void)
  352. {
  353. // is there anything to do?
  354. if (m_tStop <= m_tStart) {
  355. EndOfStream();
  356. return;
  357. }
  358. BOOL bDiscontinuity = TRUE;
  359. // if there is more than one sample at the allocator,
  360. // then try to queue 2 at once in order to overlap.
  361. // -- get buffer count and required alignment
  362. ALLOCATOR_PROPERTIES Actual;
  363. HRESULT hr = m_pAlloc->GetProperties(&Actual);
  364. // align the start position downwards
  365. REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
  366. REFERENCE_TIME tCurrent = tStart;
  367. REFERENCE_TIME tStop = m_tStop;
  368. if (tStop > m_tDuration) {
  369. tStop = m_tDuration;
  370. }
  371. // align the stop position - may be past stop, but that
  372. // doesn't matter
  373. REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
  374. DWORD dwRequest;
  375. if (!m_bSync) {
  376. // Break out of the loop either if we get to the end or we're asked
  377. // to do something else
  378. while (tCurrent < tAlignStop) {
  379. // Break out without calling EndOfStream if we're asked to
  380. // do something different
  381. if (CheckRequest(&dwRequest)) {
  382. return;
  383. }
  384. // queue a first sample
  385. if (Actual.cBuffers > 1) {
  386. hr = QueueSample(tCurrent, tAlignStop, TRUE);
  387. bDiscontinuity = FALSE;
  388. if (FAILED(hr)) {
  389. return;
  390. }
  391. }
  392. // loop queueing second and waiting for first..
  393. while (tCurrent < tAlignStop) {
  394. hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
  395. bDiscontinuity = FALSE;
  396. if (FAILED(hr)) {
  397. return;
  398. }
  399. hr = CollectAndDeliver(tStart, tStop);
  400. if (S_OK != hr) {
  401. // stop if error, or if downstream filter said
  402. // to stop.
  403. return;
  404. }
  405. }
  406. if (Actual.cBuffers > 1) {
  407. hr = CollectAndDeliver(tStart, tStop);
  408. if (FAILED(hr)) {
  409. return;
  410. }
  411. }
  412. }
  413. } else {
  414. // sync version of above loop
  415. while (tCurrent < tAlignStop) {
  416. // Break out without calling EndOfStream if we're asked to
  417. // do something different
  418. if (CheckRequest(&dwRequest)) {
  419. return;
  420. }
  421. IMediaSample* pSample;
  422. hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
  423. if (FAILED(hr)) {
  424. OnError(hr);
  425. return;
  426. }
  427. LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
  428. if (tStopThis > tAlignStop) {
  429. tStopThis = tAlignStop;
  430. }
  431. pSample->SetTime(&tCurrent, &tStopThis);
  432. tCurrent = tStopThis;
  433. if (bDiscontinuity) {
  434. pSample->SetDiscontinuity(TRUE);
  435. bDiscontinuity = FALSE;
  436. }
  437. hr = m_pReader->SyncReadAligned(pSample);
  438. if (FAILED(hr)) {
  439. pSample->Release();
  440. OnError(hr);
  441. return;
  442. }
  443. hr = DeliverSample(pSample, tStart, tStop);
  444. if (hr != S_OK) {
  445. if (FAILED(hr)) {
  446. OnError(hr);
  447. }
  448. return;
  449. }
  450. }
  451. }
  452. EndOfStream();
  453. }
  454. // after a flush, cancelled i/o will be waiting for collection
  455. // and release
  456. void
  457. CPullPin::CleanupCancelled(void)
  458. {
  459. while (1) {
  460. IMediaSample * pSample;
  461. DWORD_PTR dwUnused;
  462. HRESULT hr = m_pReader->WaitForNext(
  463. 0, // no wait
  464. &pSample,
  465. &dwUnused);
  466. if(pSample) {
  467. pSample->Release();
  468. } else {
  469. // no more samples
  470. return;
  471. }
  472. }
  473. }