LockFreeRingBuffer.cpp 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. /*
  2. * LockFreeRingBuffer.cpp
  3. * Lock-free ring buffer data structure.
  4. * One thread can be consumer and one can be producer
  5. *
  6. * Created by Ben Allison on 11/10/07.
  7. * Copyright 2007 Nullsoft, Inc. All rights reserved.
  8. *
  9. */
  10. #include "LockFreeRingBuffer.h"
  11. #include "foundation/types.h"
  12. #include "foundation/atomics.h"
  13. #include "foundation/error.h"
  14. #include <stdlib.h>
  15. #include <string.h>
  16. #include <stdint.h>
  17. #define MIN(a,b) ((a<b)?(a):(b))
  18. LockFreeRingBuffer::LockFreeRingBuffer()
  19. {
  20. ringBuffer=0;
  21. ringBufferSize=0;
  22. ringBufferUsed=0;
  23. ringWritePosition=0;
  24. ringReadPosition=0;
  25. }
  26. LockFreeRingBuffer::~LockFreeRingBuffer()
  27. {
  28. free(ringBuffer);
  29. ringBuffer=0;
  30. }
  31. void LockFreeRingBuffer::Reset()
  32. {
  33. free(ringBuffer);
  34. ringBuffer=0;
  35. }
  36. bool LockFreeRingBuffer::reserve(size_t bytes)
  37. {
  38. void *new_ring_buffer = realloc(ringBuffer, bytes);
  39. if (!new_ring_buffer)
  40. return false;
  41. ringBufferSize=bytes;
  42. ringBuffer = (char *)new_ring_buffer;
  43. clear();
  44. return true;
  45. }
  46. int LockFreeRingBuffer::expand(size_t bytes)
  47. {
  48. if (bytes > ringBufferSize)
  49. {
  50. char *new_buffer = (char *)realloc(ringBuffer, bytes);
  51. if (!new_buffer)
  52. return NErr_OutOfMemory;
  53. size_t write_offset = ringReadPosition-ringBuffer;
  54. size_t read_offset = ringWritePosition-ringBuffer;
  55. /* update write pointer for the new buffer */
  56. ringWritePosition = new_buffer + write_offset;
  57. if (write_offset > read_offset || !ringBufferUsed) /* ringBufferUsed will resolve the ambiguity when ringWritePosition == ringReadPosition */
  58. {
  59. /* the ring buffer looks like [ RXXXW ], so we don't need to move anything.
  60. Just update the read pointer */
  61. ringReadPosition = new_buffer + write_offset;
  62. }
  63. else
  64. {
  65. /* [XXW RXX] needs to become [XXW RXX] */
  66. size_t end_bytes = ringBufferSize-write_offset; // number of bytes that we need to relocate (the RXX portion)
  67. char *new_read_pointer = &new_buffer[bytes - end_bytes];
  68. memmove(new_read_pointer, ringReadPosition, end_bytes);
  69. ringReadPosition = new_read_pointer; /* update read pointer */
  70. }
  71. ringBufferSize=bytes;
  72. ringBuffer = new_buffer;
  73. #if defined(__ARM_ARCH_7A__)
  74. __asm__ __volatile__ ("dmb" : : : "memory");
  75. #endif
  76. return NErr_Success;
  77. }
  78. else
  79. return NErr_NoAction;
  80. }
  81. bool LockFreeRingBuffer::empty() const
  82. {
  83. return (ringBufferUsed==0);
  84. }
  85. size_t LockFreeRingBuffer::read(void *dest, size_t len)
  86. {
  87. int8_t *out = (int8_t *)dest; // lets us do pointer math easier
  88. size_t toCopy=ringBufferUsed;
  89. if (toCopy > len) toCopy = len;
  90. size_t copied=0;
  91. len-=toCopy;
  92. // read to the end of the ring buffer
  93. size_t end = ringBufferSize-(ringReadPosition-ringBuffer);
  94. size_t read1 = MIN(end, toCopy);
  95. memcpy(out, ringReadPosition, read1);
  96. #if defined(__ARM_ARCH_7A__)
  97. __asm__ __volatile__ ("dmb" : : : "memory");
  98. #endif
  99. copied+=read1;
  100. ringReadPosition+=read1;
  101. if (ringReadPosition == ringBuffer + ringBufferSize)
  102. ringReadPosition=ringBuffer;
  103. // update positions
  104. nx_atomic_sub(read1, &ringBufferUsed);
  105. toCopy-=read1;
  106. out = (int8_t *)out+read1;
  107. // see if we still have more to read after wrapping around
  108. if (toCopy)
  109. {
  110. memcpy(out, ringReadPosition, toCopy);
  111. #if defined(__ARM_ARCH_7A__)
  112. __asm__ __volatile__ ("dmb" : : : "memory");
  113. #endif
  114. copied+=toCopy;
  115. ringReadPosition+=toCopy;
  116. nx_atomic_sub(toCopy, &ringBufferUsed);
  117. if (ringReadPosition == ringBuffer + ringBufferSize)
  118. ringReadPosition=ringBuffer;
  119. }
  120. return copied;
  121. }
  122. size_t LockFreeRingBuffer::advance_to(size_t position)
  123. {
  124. intptr_t bytes_to_flush = (intptr_t)(position - (size_t)ringReadPosition);
  125. if (bytes_to_flush < 0)
  126. bytes_to_flush += ringBufferSize;
  127. return advance(bytes_to_flush);
  128. }
  129. size_t LockFreeRingBuffer::at(size_t offset, void *dest, size_t len) const
  130. {
  131. size_t toCopy=ringBufferUsed;
  132. // make a local copy of this so we don't blow the original
  133. char *ringReadPosition = this->ringReadPosition;
  134. /* --- do a "dummy read" to deal with the offset request --- */
  135. size_t dummy_end = ringBufferSize-(ringReadPosition-ringBuffer);
  136. offset = MIN(toCopy, offset);
  137. size_t read0 = MIN(dummy_end, offset);
  138. ringReadPosition+=read0;
  139. if (ringReadPosition == ringBuffer + ringBufferSize)
  140. ringReadPosition=ringBuffer;
  141. // update positions
  142. toCopy-=read0;
  143. offset-=read0;
  144. // do second-half read (wraparound)
  145. if (offset)
  146. {
  147. ringReadPosition+=offset;
  148. toCopy-=offset;
  149. }
  150. // dummy read done
  151. /* --- set up destination buffer and copy size --- */
  152. int8_t *out = (int8_t *)dest; // lets us do pointer math easier
  153. if (toCopy > len) toCopy=len;
  154. size_t copied=0;
  155. /* --- read to the end of the ring buffer --- */
  156. size_t end = ringBufferSize-(ringReadPosition-ringBuffer);
  157. size_t read1 = MIN(end, toCopy);
  158. memcpy(out, ringReadPosition, read1);
  159. copied+=read1;
  160. ringReadPosition+=read1;
  161. if (ringReadPosition == ringBuffer + ringBufferSize)
  162. ringReadPosition=ringBuffer;
  163. // update positions
  164. toCopy-=read1;
  165. out = (int8_t *)out+read1;
  166. /* --- see if we still have more to read after wrapping around --- */
  167. if (toCopy)
  168. {
  169. memcpy(out, ringReadPosition, toCopy);
  170. copied+=toCopy;
  171. ringReadPosition+=toCopy;
  172. }
  173. return copied;
  174. }
  175. size_t LockFreeRingBuffer::peek(void *dest, size_t len) const
  176. {
  177. int8_t *out = (int8_t *)dest; // lets us do pointer math easier
  178. size_t toCopy=ringBufferUsed;
  179. if (toCopy > len) toCopy=len;
  180. size_t copied=0;
  181. // make a local copy of this so we don't blow the original
  182. char *ringReadPosition = this->ringReadPosition;
  183. // read to the end of the ring buffer
  184. size_t end = ringBufferSize-(ringReadPosition-ringBuffer);
  185. size_t read1 = MIN(end, toCopy);
  186. memcpy(out, ringReadPosition, read1);
  187. copied+=read1;
  188. ringReadPosition+=read1;
  189. if (ringReadPosition == ringBuffer + ringBufferSize)
  190. ringReadPosition=ringBuffer;
  191. // update positions
  192. toCopy-=read1;
  193. out = (int8_t *)out+read1;
  194. // see if we still have more to read after wrapping around
  195. if (toCopy)
  196. {
  197. memcpy(out, ringReadPosition, toCopy);
  198. copied+=toCopy;
  199. ringReadPosition+=toCopy;
  200. }
  201. return copied;
  202. }
  203. size_t LockFreeRingBuffer::advance(size_t len)
  204. {
  205. #if defined(__ARM_ARCH_7A__)
  206. __asm__ __volatile__ ("dmb" : : : "memory");
  207. #endif
  208. size_t toCopy=ringBufferUsed;
  209. if (toCopy>len) toCopy=len;
  210. size_t copied=0;
  211. len-=toCopy;
  212. // read to the end of the ring buffer
  213. size_t end = ringBufferSize-(ringReadPosition-ringBuffer);
  214. size_t read1 = MIN(end, toCopy);
  215. copied+=read1;
  216. ringReadPosition+=read1;
  217. if (ringReadPosition == ringBuffer + ringBufferSize)
  218. ringReadPosition=ringBuffer;
  219. // update positions
  220. toCopy-=read1;
  221. nx_atomic_sub(read1, &ringBufferUsed);
  222. // see if we still have more to read after wrapping around
  223. if (toCopy)
  224. {
  225. copied+=toCopy;
  226. ringReadPosition+=toCopy;
  227. nx_atomic_sub(toCopy, &ringBufferUsed);
  228. if (ringReadPosition == ringBuffer + ringBufferSize)
  229. ringReadPosition=ringBuffer;
  230. }
  231. return copied;
  232. }
  233. size_t LockFreeRingBuffer::avail() const
  234. {
  235. return ringBufferSize - ringBufferUsed;
  236. }
  237. size_t LockFreeRingBuffer::write(const void *buffer, size_t bytes)
  238. {
  239. size_t used=ringBufferUsed;
  240. size_t avail = ringBufferSize - used;
  241. bytes = MIN(avail, bytes);
  242. // write to the end of the ring buffer
  243. size_t end = ringBufferSize-(ringWritePosition-ringBuffer);
  244. size_t copied=0;
  245. size_t write1 = MIN(end, bytes);
  246. memcpy(ringWritePosition, buffer, write1);
  247. #if defined(__ARM_ARCH_7A__)
  248. __asm__ __volatile__ ("dmb" : : : "memory");
  249. #endif
  250. copied+=write1;
  251. ringWritePosition+=write1;
  252. if (ringWritePosition == ringBuffer + ringBufferSize)
  253. ringWritePosition=ringBuffer;
  254. // update positions
  255. nx_atomic_add(write1, &ringBufferUsed);
  256. bytes-=write1;
  257. buffer = (const int8_t *)buffer+write1;
  258. // see if we still have more to write after wrapping around
  259. if (bytes)
  260. {
  261. memcpy(ringWritePosition, buffer, bytes);
  262. #if defined(__ARM_ARCH_7A__)
  263. __asm__ __volatile__ ("dmb" : : : "memory");
  264. #endif
  265. copied+=bytes;
  266. ringWritePosition+=bytes;
  267. nx_atomic_add(bytes, &ringBufferUsed);
  268. if (ringWritePosition == ringBuffer + ringBufferSize)
  269. ringWritePosition=ringBuffer;
  270. }
  271. return copied;
  272. }
  273. size_t LockFreeRingBuffer::update(size_t bytes)
  274. {
  275. size_t used=ringBufferUsed;
  276. size_t avail = ringBufferSize - used;
  277. bytes = MIN(avail, bytes);
  278. // write to the end of the ring buffer
  279. size_t end = ringBufferSize-(ringWritePosition-ringBuffer);
  280. size_t copied=0;
  281. size_t write1 = MIN(end, bytes);
  282. #if defined(__ARM_ARCH_7A__)
  283. __asm__ __volatile__ ("dmb" : : : "memory");
  284. #endif
  285. copied+=write1;
  286. ringWritePosition+=write1;
  287. if (ringWritePosition == ringBuffer + ringBufferSize)
  288. ringWritePosition=ringBuffer;
  289. // update positions
  290. nx_atomic_add(write1, &ringBufferUsed);
  291. bytes-=write1;
  292. // see if we still have more to write after wrapping around
  293. if (bytes)
  294. {
  295. /* no need for memory barrier here, we havn't written anything in the interim */
  296. copied+=bytes;
  297. ringWritePosition+=bytes;
  298. nx_atomic_add(bytes, &ringBufferUsed);
  299. if (ringWritePosition == ringBuffer + ringBufferSize)
  300. ringWritePosition=ringBuffer;
  301. }
  302. return copied;
  303. }
  304. void LockFreeRingBuffer::get_write_buffer(size_t bytes, void **buffer, size_t *bytes_available)
  305. {
  306. size_t used=ringBufferUsed;
  307. size_t avail = ringBufferSize - used;
  308. bytes = MIN(avail, bytes);
  309. // can only write to the end of the ring buffer
  310. size_t end = ringBufferSize-(ringWritePosition-ringBuffer);
  311. *bytes_available = MIN(end, bytes);
  312. *buffer = ringWritePosition;
  313. }
  314. void LockFreeRingBuffer::get_read_buffer(size_t bytes, const void **buffer, size_t *bytes_available)
  315. {
  316. size_t toCopy=ringBufferUsed;
  317. if (toCopy > bytes) toCopy=bytes;
  318. // read to the end of the ring buffer
  319. size_t end = ringBufferSize-(ringReadPosition-ringBuffer);
  320. *bytes_available = MIN(end, toCopy);
  321. *buffer = ringReadPosition;
  322. }
  323. size_t LockFreeRingBuffer::size() const
  324. {
  325. return ringBufferUsed;
  326. }
  327. void LockFreeRingBuffer::clear()
  328. {
  329. nx_atomic_write(0, &ringBufferUsed);
  330. ringWritePosition=ringBuffer;
  331. ringReadPosition=ringBuffer;
  332. }
  333. size_t LockFreeRingBuffer::write_position() const
  334. {
  335. return (size_t)ringWritePosition;
  336. }
  337. size_t LockFreeRingBuffer::read_position() const
  338. {
  339. return (size_t)ringReadPosition;
  340. }