1
0

outputq.cpp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. //------------------------------------------------------------------------------
  2. // File: OutputQ.cpp
  3. //
  4. // Desc: DirectShow base classes - implements COutputQueue class used by an
  5. // output pin which may sometimes want to queue output samples on a
  6. // separate thread and sometimes call Receive() directly on the input
  7. // pin.
  8. //
  9. // Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved.
  10. //------------------------------------------------------------------------------
  11. #include <streams.h>
  12. //
  13. // COutputQueue Constructor :
  14. //
  15. // Determines if a thread is to be created and creates resources
  16. //
  17. // pInputPin - the downstream input pin we're queueing samples to
  18. //
  19. // phr - changed to a failure code if this function fails
  20. // (otherwise unchanges)
  21. //
  22. // bAuto - Ask pInputPin if it can block in Receive by calling
  23. // its ReceiveCanBlock method and create a thread if
  24. // it can block, otherwise not.
  25. //
  26. // bQueue - if bAuto == FALSE then we create a thread if and only
  27. // if bQueue == TRUE
  28. //
  29. // lBatchSize - work in batches of lBatchSize
  30. //
  31. // bBatchEact - Use exact batch sizes so don't send until the
  32. // batch is full or SendAnyway() is called
  33. //
  34. // lListSize - If we create a thread make the list of samples queued
  35. // to the thread have this size cache
  36. //
  37. // dwPriority - If we create a thread set its priority to this
  38. //
  39. COutputQueue::COutputQueue(
  40. IPin *pInputPin, // Pin to send stuff to
  41. __inout HRESULT *phr, // 'Return code'
  42. BOOL bAuto, // Ask pin if queue or not
  43. BOOL bQueue, // Send through queue
  44. LONG lBatchSize, // Batch
  45. BOOL bBatchExact, // Batch exactly to BatchSize
  46. LONG lListSize,
  47. DWORD dwPriority,
  48. bool bFlushingOpt // flushing optimization
  49. ) : m_lBatchSize(lBatchSize),
  50. m_bBatchExact(bBatchExact && (lBatchSize > 1)),
  51. m_hThread(NULL),
  52. m_hSem(NULL),
  53. m_List(NULL),
  54. m_pPin(pInputPin),
  55. m_ppSamples(NULL),
  56. m_lWaiting(0),
  57. m_evFlushComplete(FALSE, phr),
  58. m_pInputPin(NULL),
  59. m_bSendAnyway(FALSE),
  60. m_nBatched(0),
  61. m_bFlushing(FALSE),
  62. m_bFlushed(TRUE),
  63. m_bFlushingOpt(bFlushingOpt),
  64. m_bTerminate(FALSE),
  65. m_hEventPop(NULL),
  66. m_hr(S_OK)
  67. {
  68. ASSERT(m_lBatchSize > 0);
  69. if (FAILED(*phr)) {
  70. return;
  71. }
  72. // Check the input pin is OK and cache its IMemInputPin interface
  73. *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
  74. if (FAILED(*phr)) {
  75. return;
  76. }
  77. // See if we should ask the downstream pin
  78. if (bAuto) {
  79. HRESULT hr = m_pInputPin->ReceiveCanBlock();
  80. if (SUCCEEDED(hr)) {
  81. bQueue = hr == S_OK;
  82. }
  83. }
  84. // Create our sample batch
  85. m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
  86. if (m_ppSamples == NULL) {
  87. *phr = E_OUTOFMEMORY;
  88. return;
  89. }
  90. // If we're queueing allocate resources
  91. if (bQueue) {
  92. DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
  93. m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
  94. if (m_hSem == NULL) {
  95. DWORD dwError = GetLastError();
  96. *phr = AmHresultFromWin32(dwError);
  97. return;
  98. }
  99. m_List = new CSampleList(NAME("Sample Queue List"),
  100. lListSize,
  101. FALSE // No lock
  102. );
  103. if (m_List == NULL) {
  104. *phr = E_OUTOFMEMORY;
  105. return;
  106. }
  107. DWORD dwThreadId;
  108. m_hThread = CreateThread(NULL,
  109. 0,
  110. InitialThreadProc,
  111. (LPVOID)this,
  112. 0,
  113. &dwThreadId);
  114. if (m_hThread == NULL) {
  115. DWORD dwError = GetLastError();
  116. *phr = AmHresultFromWin32(dwError);
  117. return;
  118. }
  119. SetThreadPriority(m_hThread, dwPriority);
  120. } else {
  121. DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
  122. }
  123. }
  124. //
  125. // COutputQueuee Destructor :
  126. //
  127. // Free all resources -
  128. //
  129. // Thread,
  130. // Batched samples
  131. //
  132. COutputQueue::~COutputQueue()
  133. {
  134. DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
  135. /* Free our pointer */
  136. if (m_pInputPin != NULL) {
  137. m_pInputPin->Release();
  138. }
  139. if (m_hThread != NULL) {
  140. {
  141. CAutoLock lck(this);
  142. m_bTerminate = TRUE;
  143. m_hr = S_FALSE;
  144. NotifyThread();
  145. }
  146. DbgWaitForSingleObject(m_hThread);
  147. EXECUTE_ASSERT(CloseHandle(m_hThread));
  148. // The thread frees the samples when asked to terminate
  149. ASSERT(m_List->GetCount() == 0);
  150. delete m_List;
  151. } else {
  152. FreeSamples();
  153. }
  154. if (m_hSem != NULL) {
  155. EXECUTE_ASSERT(CloseHandle(m_hSem));
  156. }
  157. delete [] m_ppSamples;
  158. }
  159. //
  160. // Call the real thread proc as a member function
  161. //
  162. DWORD WINAPI COutputQueue::InitialThreadProc(__in LPVOID pv)
  163. {
  164. HRESULT hrCoInit = CAMThread::CoInitializeHelper();
  165. COutputQueue *pSampleQueue = (COutputQueue *)pv;
  166. DWORD dwReturn = pSampleQueue->ThreadProc();
  167. if(hrCoInit == S_OK) {
  168. CoUninitialize();
  169. }
  170. return dwReturn;
  171. }
  172. //
  173. // Thread sending the samples downstream :
  174. //
  175. // When there is nothing to do the thread sets m_lWaiting (while
  176. // holding the critical section) and then waits for m_hSem to be
  177. // set (not holding the critical section)
  178. //
  179. DWORD COutputQueue::ThreadProc()
  180. {
  181. while (TRUE) {
  182. BOOL bWait = FALSE;
  183. IMediaSample *pSample;
  184. LONG lNumberToSend; // Local copy
  185. NewSegmentPacket* ppacket;
  186. //
  187. // Get a batch of samples and send it if possible
  188. // In any case exit the loop if there is a control action
  189. // requested
  190. //
  191. {
  192. CAutoLock lck(this);
  193. while (TRUE) {
  194. if (m_bTerminate) {
  195. FreeSamples();
  196. return 0;
  197. }
  198. if (m_bFlushing) {
  199. FreeSamples();
  200. SetEvent(m_evFlushComplete);
  201. }
  202. // Get a sample off the list
  203. pSample = m_List->RemoveHead();
  204. // inform derived class we took something off the queue
  205. if (m_hEventPop) {
  206. //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
  207. SetEvent(m_hEventPop);
  208. }
  209. if (pSample != NULL &&
  210. !IsSpecialSample(pSample)) {
  211. // If its just a regular sample just add it to the batch
  212. // and exit the loop if the batch is full
  213. m_ppSamples[m_nBatched++] = pSample;
  214. if (m_nBatched == m_lBatchSize) {
  215. break;
  216. }
  217. } else {
  218. // If there was nothing in the queue and there's nothing
  219. // to send (either because there's nothing or the batch
  220. // isn't full) then prepare to wait
  221. if (pSample == NULL &&
  222. (m_bBatchExact || m_nBatched == 0)) {
  223. // Tell other thread to set the event when there's
  224. // something do to
  225. ASSERT(m_lWaiting == 0);
  226. m_lWaiting++;
  227. bWait = TRUE;
  228. } else {
  229. // We break out of the loop on SEND_PACKET unless
  230. // there's nothing to send
  231. if (pSample == SEND_PACKET && m_nBatched == 0) {
  232. continue;
  233. }
  234. if (pSample == NEW_SEGMENT) {
  235. // now we need the parameters - we are
  236. // guaranteed that the next packet contains them
  237. ppacket = (NewSegmentPacket *) m_List->RemoveHead();
  238. // we took something off the queue
  239. if (m_hEventPop) {
  240. //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
  241. SetEvent(m_hEventPop);
  242. }
  243. ASSERT(ppacket);
  244. }
  245. // EOS_PACKET falls through here and we exit the loop
  246. // In this way it acts like SEND_PACKET
  247. }
  248. break;
  249. }
  250. }
  251. if (!bWait) {
  252. // We look at m_nBatched from the client side so keep
  253. // it up to date inside the critical section
  254. lNumberToSend = m_nBatched; // Local copy
  255. m_nBatched = 0;
  256. }
  257. }
  258. // Wait for some more data
  259. if (bWait) {
  260. DbgWaitForSingleObject(m_hSem);
  261. continue;
  262. }
  263. // OK - send it if there's anything to send
  264. // We DON'T check m_bBatchExact here because either we've got
  265. // a full batch or we dropped through because we got
  266. // SEND_PACKET or EOS_PACKET - both of which imply we should
  267. // flush our batch
  268. if (lNumberToSend != 0) {
  269. long nProcessed;
  270. if (m_hr == S_OK) {
  271. ASSERT(!m_bFlushed);
  272. HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  273. lNumberToSend,
  274. &nProcessed);
  275. /* Don't overwrite a flushing state HRESULT */
  276. CAutoLock lck(this);
  277. if (m_hr == S_OK) {
  278. m_hr = hr;
  279. }
  280. ASSERT(!m_bFlushed);
  281. }
  282. while (lNumberToSend != 0) {
  283. m_ppSamples[--lNumberToSend]->Release();
  284. }
  285. if (m_hr != S_OK) {
  286. // In any case wait for more data - S_OK just
  287. // means there wasn't an error
  288. DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
  289. m_hr));
  290. }
  291. }
  292. // Check for end of stream
  293. if (pSample == EOS_PACKET) {
  294. // We don't send even end of stream on if we've previously
  295. // returned something other than S_OK
  296. // This is because in that case the pin which returned
  297. // something other than S_OK should have either sent
  298. // EndOfStream() or notified the filter graph
  299. if (m_hr == S_OK) {
  300. DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  301. HRESULT hr = m_pPin->EndOfStream();
  302. if (FAILED(hr)) {
  303. DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  304. }
  305. }
  306. }
  307. // Data from a new source
  308. if (pSample == RESET_PACKET) {
  309. m_hr = S_OK;
  310. SetEvent(m_evFlushComplete);
  311. }
  312. if (pSample == NEW_SEGMENT) {
  313. m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
  314. delete ppacket;
  315. }
  316. }
  317. }
  318. // Send batched stuff anyway
  319. void COutputQueue::SendAnyway()
  320. {
  321. if (!IsQueued()) {
  322. // m_bSendAnyway is a private parameter checked in ReceiveMultiple
  323. m_bSendAnyway = TRUE;
  324. LONG nProcessed;
  325. ReceiveMultiple(NULL, 0, &nProcessed);
  326. m_bSendAnyway = FALSE;
  327. } else {
  328. CAutoLock lck(this);
  329. QueueSample(SEND_PACKET);
  330. NotifyThread();
  331. }
  332. }
  333. void
  334. COutputQueue::NewSegment(
  335. REFERENCE_TIME tStart,
  336. REFERENCE_TIME tStop,
  337. double dRate)
  338. {
  339. if (!IsQueued()) {
  340. if (S_OK == m_hr) {
  341. if (m_bBatchExact) {
  342. SendAnyway();
  343. }
  344. m_pPin->NewSegment(tStart, tStop, dRate);
  345. }
  346. } else {
  347. if (m_hr == S_OK) {
  348. //
  349. // we need to queue the new segment to appear in order in the
  350. // data, but we need to pass parameters to it. Rather than
  351. // take the hit of wrapping every single sample so we can tell
  352. // special ones apart, we queue special pointers to indicate
  353. // special packets, and we guarantee (by holding the
  354. // critical section) that the packet immediately following a
  355. // NEW_SEGMENT value is a NewSegmentPacket containing the
  356. // parameters.
  357. NewSegmentPacket * ppack = new NewSegmentPacket;
  358. if (ppack == NULL) {
  359. return;
  360. }
  361. ppack->tStart = tStart;
  362. ppack->tStop = tStop;
  363. ppack->dRate = dRate;
  364. CAutoLock lck(this);
  365. QueueSample(NEW_SEGMENT);
  366. QueueSample( (IMediaSample*) ppack);
  367. NotifyThread();
  368. }
  369. }
  370. }
  371. //
  372. // End of Stream is queued to output device
  373. //
  374. void COutputQueue::EOS()
  375. {
  376. CAutoLock lck(this);
  377. if (!IsQueued()) {
  378. if (m_bBatchExact) {
  379. SendAnyway();
  380. }
  381. if (m_hr == S_OK) {
  382. DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  383. m_bFlushed = FALSE;
  384. HRESULT hr = m_pPin->EndOfStream();
  385. if (FAILED(hr)) {
  386. DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  387. }
  388. }
  389. } else {
  390. if (m_hr == S_OK) {
  391. m_bFlushed = FALSE;
  392. QueueSample(EOS_PACKET);
  393. NotifyThread();
  394. }
  395. }
  396. }
  397. //
  398. // Flush all the samples in the queue
  399. //
  400. void COutputQueue::BeginFlush()
  401. {
  402. if (IsQueued()) {
  403. {
  404. CAutoLock lck(this);
  405. // block receives -- we assume this is done by the
  406. // filter in which we are a component
  407. // discard all queued data
  408. m_bFlushing = TRUE;
  409. // Make sure we discard all samples from now on
  410. if (m_hr == S_OK) {
  411. m_hr = S_FALSE;
  412. }
  413. // Optimize so we don't keep calling downstream all the time
  414. if (m_bFlushed && m_bFlushingOpt) {
  415. return;
  416. }
  417. // Make sure we really wait for the flush to complete
  418. m_evFlushComplete.Reset();
  419. NotifyThread();
  420. }
  421. // pass this downstream
  422. m_pPin->BeginFlush();
  423. } else {
  424. // pass downstream first to avoid deadlocks
  425. m_pPin->BeginFlush();
  426. CAutoLock lck(this);
  427. // discard all queued data
  428. m_bFlushing = TRUE;
  429. // Make sure we discard all samples from now on
  430. if (m_hr == S_OK) {
  431. m_hr = S_FALSE;
  432. }
  433. }
  434. }
  435. //
  436. // leave flush mode - pass this downstream
  437. void COutputQueue::EndFlush()
  438. {
  439. {
  440. CAutoLock lck(this);
  441. ASSERT(m_bFlushing);
  442. if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
  443. m_bFlushing = FALSE;
  444. m_hr = S_OK;
  445. return;
  446. }
  447. }
  448. // sync with pushing thread -- done in BeginFlush
  449. // ensure no more data to go downstream -- done in BeginFlush
  450. //
  451. // Because we are synching here there is no need to hold the critical
  452. // section (in fact we'd deadlock if we did!)
  453. if (IsQueued()) {
  454. m_evFlushComplete.Wait();
  455. } else {
  456. FreeSamples();
  457. }
  458. // Be daring - the caller has guaranteed no samples will arrive
  459. // before EndFlush() returns
  460. m_bFlushing = FALSE;
  461. m_bFlushed = TRUE;
  462. // call EndFlush on downstream pins
  463. m_pPin->EndFlush();
  464. m_hr = S_OK;
  465. }
  466. // COutputQueue::QueueSample
  467. //
  468. // private method to Send a sample to the output queue
  469. // The critical section MUST be held when this is called
  470. void COutputQueue::QueueSample(IMediaSample *pSample)
  471. {
  472. if (NULL == m_List->AddTail(pSample)) {
  473. if (!IsSpecialSample(pSample)) {
  474. pSample->Release();
  475. }
  476. }
  477. }
  478. //
  479. // COutputQueue::Receive()
  480. //
  481. // Send a single sample by the multiple sample route
  482. // (NOTE - this could be optimized if necessary)
  483. //
  484. // On return the sample will have been Release()'d
  485. //
  486. HRESULT COutputQueue::Receive(IMediaSample *pSample)
  487. {
  488. LONG nProcessed;
  489. return ReceiveMultiple(&pSample, 1, &nProcessed);
  490. }
  491. //
  492. // COutputQueue::ReceiveMultiple()
  493. //
  494. // Send a set of samples to the downstream pin
  495. //
  496. // ppSamples - array of samples
  497. // nSamples - how many
  498. // nSamplesProcessed - How many were processed
  499. //
  500. // On return all samples will have been Release()'d
  501. //
  502. HRESULT COutputQueue::ReceiveMultiple (
  503. __in_ecount(nSamples) IMediaSample **ppSamples,
  504. long nSamples,
  505. __out long *nSamplesProcessed)
  506. {
  507. if (nSamples < 0) {
  508. return E_INVALIDARG;
  509. }
  510. CAutoLock lck(this);
  511. // Either call directly or queue up the samples
  512. if (!IsQueued()) {
  513. // If we already had a bad return code then just return
  514. if (S_OK != m_hr) {
  515. // If we've never received anything since the last Flush()
  516. // and the sticky return code is not S_OK we must be
  517. // flushing
  518. // ((!A || B) is equivalent to A implies B)
  519. ASSERT(!m_bFlushed || m_bFlushing);
  520. // We're supposed to Release() them anyway!
  521. *nSamplesProcessed = 0;
  522. for (int i = 0; i < nSamples; i++) {
  523. DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
  524. nSamples, m_hr));
  525. ppSamples[i]->Release();
  526. }
  527. return m_hr;
  528. }
  529. //
  530. // If we're flushing the sticky return code should be S_FALSE
  531. //
  532. ASSERT(!m_bFlushing);
  533. m_bFlushed = FALSE;
  534. ASSERT(m_nBatched < m_lBatchSize);
  535. ASSERT(m_nBatched == 0 || m_bBatchExact);
  536. // Loop processing the samples in batches
  537. LONG iLost = 0;
  538. long iDone = 0;
  539. for (iDone = 0;
  540. iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
  541. ) {
  542. //pragma message (REMIND("Implement threshold scheme"))
  543. ASSERT(m_nBatched < m_lBatchSize);
  544. if (iDone < nSamples) {
  545. m_ppSamples[m_nBatched++] = ppSamples[iDone++];
  546. }
  547. if (m_nBatched == m_lBatchSize ||
  548. nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
  549. LONG nDone;
  550. DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
  551. m_nBatched));
  552. if (m_hr == S_OK) {
  553. m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  554. m_nBatched,
  555. &nDone);
  556. } else {
  557. nDone = 0;
  558. }
  559. iLost += m_nBatched - nDone;
  560. for (LONG i = 0; i < m_nBatched; i++) {
  561. m_ppSamples[i]->Release();
  562. }
  563. m_nBatched = 0;
  564. }
  565. }
  566. *nSamplesProcessed = iDone - iLost;
  567. if (*nSamplesProcessed < 0) {
  568. *nSamplesProcessed = 0;
  569. }
  570. return m_hr;
  571. } else {
  572. /* We're sending to our thread */
  573. if (m_hr != S_OK) {
  574. *nSamplesProcessed = 0;
  575. DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
  576. nSamples, m_hr));
  577. for (int i = 0; i < nSamples; i++) {
  578. ppSamples[i]->Release();
  579. }
  580. return m_hr;
  581. }
  582. m_bFlushed = FALSE;
  583. for (long i = 0; i < nSamples; i++) {
  584. QueueSample(ppSamples[i]);
  585. }
  586. *nSamplesProcessed = nSamples;
  587. if (!m_bBatchExact ||
  588. m_nBatched + m_List->GetCount() >= m_lBatchSize) {
  589. NotifyThread();
  590. }
  591. return S_OK;
  592. }
  593. }
  594. // Get ready for new data - cancels sticky m_hr
  595. void COutputQueue::Reset()
  596. {
  597. if (!IsQueued()) {
  598. m_hr = S_OK;
  599. } else {
  600. {
  601. CAutoLock lck(this);
  602. QueueSample(RESET_PACKET);
  603. NotifyThread();
  604. }
  605. m_evFlushComplete.Wait();
  606. }
  607. }
  608. // Remove and Release() all queued and Batched samples
  609. void COutputQueue::FreeSamples()
  610. {
  611. CAutoLock lck(this);
  612. if (IsQueued()) {
  613. while (TRUE) {
  614. IMediaSample *pSample = m_List->RemoveHead();
  615. // inform derived class we took something off the queue
  616. if (m_hEventPop) {
  617. //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
  618. SetEvent(m_hEventPop);
  619. }
  620. if (pSample == NULL) {
  621. break;
  622. }
  623. if (!IsSpecialSample(pSample)) {
  624. pSample->Release();
  625. } else {
  626. if (pSample == NEW_SEGMENT) {
  627. // Free NEW_SEGMENT packet
  628. NewSegmentPacket *ppacket =
  629. (NewSegmentPacket *) m_List->RemoveHead();
  630. // inform derived class we took something off the queue
  631. if (m_hEventPop) {
  632. //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
  633. SetEvent(m_hEventPop);
  634. }
  635. ASSERT(ppacket != NULL);
  636. delete ppacket;
  637. }
  638. }
  639. }
  640. }
  641. for (int i = 0; i < m_nBatched; i++) {
  642. m_ppSamples[i]->Release();
  643. }
  644. m_nBatched = 0;
  645. }
  646. // Notify the thread if there is something to do
  647. //
  648. // The critical section MUST be held when this is called
  649. void COutputQueue::NotifyThread()
  650. {
  651. // Optimize - no need to signal if it's not waiting
  652. ASSERT(IsQueued());
  653. if (m_lWaiting) {
  654. ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
  655. m_lWaiting = 0;
  656. }
  657. }
  658. // See if there's any work to do
  659. // Returns
  660. // TRUE if there is nothing on the queue and nothing in the batch
  661. // and all data has been sent
  662. // FALSE otherwise
  663. //
  664. BOOL COutputQueue::IsIdle()
  665. {
  666. CAutoLock lck(this);
  667. // We're idle if
  668. // there is no thread (!IsQueued()) OR
  669. // the thread is waiting for more work (m_lWaiting != 0)
  670. // AND
  671. // there's nothing in the current batch (m_nBatched == 0)
  672. if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
  673. return FALSE;
  674. } else {
  675. // If we're idle it shouldn't be possible for there
  676. // to be anything on the work queue
  677. ASSERT(!IsQueued() || m_List->GetCount() == 0);
  678. return TRUE;
  679. }
  680. }
  681. void COutputQueue::SetPopEvent(HANDLE hEvent)
  682. {
  683. m_hEventPop = hEvent;
  684. }