1
0

NXFileProgressiveDownloader.cpp 18 KB


  1. #include "NXFileObject.h"
  2. #include "nu/ProgressTracker.h"
  3. #include "nx/nxthread.h"
  4. #include "nx/nxsleep.h"
  5. #include "jnetlib/jnetlib.h"
  6. #include "../nswasabi/AutoCharNX.h"
  7. #include "nswasabi/ReferenceCounted.h"
  8. #include "nu/MessageLoop.h"
  9. #include <time.h>
  10. #include <new>
  11. #include "../../../WAT/WAT.h"
  12. /* TODO: benski> test this with a server that does not return content-length. I bet we could get it to work */
  13. /* TODO: benski> on windows, we can use a single CreateFile HANDLE for both reading and writing
  14. and use ReadFile(..., &overlapped) to maintain two separate file pointers
  15. this should improve performance as they will share the same cache
  16. _might_ have to use async I/O to get it to work (but use it synchronously by waiting on the handle after making the call
  17. */
  18. #define HTTP_BUFFER_SIZE 65536
  19. class NXFileObject_ProgressiveDownloader;
  20. enum
  21. {
  22. MESSAGE_KILL,
  23. MESSAGE_SEEK,
  24. MESSAGE_SIZE,
  25. MESSAGE_ERROR,
  26. MESSAGE_CLOSED,
  27. MESSAGE_CONNECTED,
  28. };
  29. char MessageString[6][10] =
  30. {
  31. "Kill",
  32. "Seek",
  33. "Size",
  34. "Error",
  35. "Closed",
  36. "Connected"
  37. };
  38. struct seek_message_t : public nu::message_node_t
  39. {
  40. uint64_t start;
  41. uint64_t end;
  42. };
  43. struct size_message_t : public nu::message_node_t
  44. {
  45. uint64_t size;
  46. };
  47. struct error_message_t : public nu::message_node_t
  48. {
  49. int error_code;
  50. };
  51. /* This class represents the thread that's actually downloading the content from the server */
  52. class ProgressiveDownload
  53. {
  54. public:
  55. ProgressiveDownload(ProgressTracker &progress_tracker, NXFileObject_ProgressiveDownloader &parent);
  56. ~ProgressiveDownload();
  57. ns_error_t Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent, nx_uri_t temp_uri);
  58. void Seek(uint64_t start, uint64_t end);
  59. void Close();
  60. private:
  61. /* These functions are called on the local thread */
  62. /* These functions run on the download thread */
  63. static nx_thread_return_t NXTHREADCALL _ProgressiveThread(nx_thread_parameter_t param) { return ((ProgressiveDownload *)param)->ProgressiveThread(); }
  64. nx_thread_return_t NXTHREADCALL ProgressiveThread();
  65. int Connect();
  66. void Internal_Write(const void *data, size_t data_len);
  67. int Wait(int milliseconds);
  68. ns_error_t SetupConnection(uint64_t start_position, uint64_t end_position);
  69. int DoRead(void *buffer, size_t bufferlen);
  70. void ProcessMessage(nu::message_node_t *message);
  71. private:
  72. ProgressTracker &progress_tracker;
  73. NXFileObject_ProgressiveDownloader &parent;
  74. nx_uri_t temp_filename, url;
  75. FILE *progressive_file_write;
  76. jnl_http_t http;
  77. char *user_agent;
  78. nx_thread_t download_thread;
  79. nu::MessageLoop message_loop;
  80. uint64_t file_size;
  81. int killswitch;
  82. };
  83. class NXFileObject_ProgressiveDownloader: public NXFileObject
  84. {
  85. public:
  86. NXFileObject_ProgressiveDownloader();
  87. ~NXFileObject_ProgressiveDownloader();
  88. ns_error_t Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent);
  89. bool Available(uint64_t size, uint64_t *available);
  90. /* API used by ProgressiveDownload */
  91. void OnFileSize(uint64_t filesize);
  92. void OnConnected();
  93. void OnError(int error_code);
  94. void OnClosed();
  95. private:
  96. /* NXFileObject implementation */
  97. ns_error_t Read(void *buffer, size_t bytes_requested, size_t *bytes_read);
  98. ns_error_t Write(const void *buffer, size_t bytes);
  99. ns_error_t Seek(uint64_t position);
  100. ns_error_t Tell(uint64_t *position);
  101. ns_error_t PeekByte(uint8_t *byte);
  102. ns_error_t Sync();
  103. ns_error_t Truncate();
  104. bool WaitForRead(uint64_t size);
  105. void ProcessMessage(nu::message_node_t *message);
  106. void Wait(unsigned int milliseconds);
  107. ProgressiveDownload download;
  108. ProgressTracker progress_tracker;
  109. FILE *progressive_file_read;
  110. bool end_of_file;
  111. bool connected;
  112. int error_code;
  113. nu::MessageLoop message_loop;
  114. bool closed;
  115. bool need_seek; // if set to true, we need to fseek(position)
  116. };
  117. ProgressiveDownload::ProgressiveDownload(ProgressTracker &progress_tracker, NXFileObject_ProgressiveDownloader &parent) : progress_tracker(progress_tracker), parent(parent)
  118. {
  119. killswitch=0;
  120. url=0;
  121. temp_filename=0;
  122. progressive_file_write=0;
  123. http=0;
  124. user_agent=0;
  125. download_thread=0;
  126. file_size=0;
  127. }
  128. ProgressiveDownload::~ProgressiveDownload()
  129. {
  130. if (download_thread)
  131. {
  132. Close();
  133. NXThreadJoin(download_thread, 0);
  134. }
  135. // TODO: flush messages
  136. if (progressive_file_write)
  137. fclose(progressive_file_write);
  138. NXURIRelease(temp_filename);
  139. NXURIRelease(url);
  140. if (http)
  141. jnl_http_release(http);
  142. free(user_agent);
  143. }
  144. void ProgressiveDownload::Close()
  145. {
  146. nu::message_node_t *message = message_loop.AllocateMessage();
  147. message->message = MESSAGE_KILL;
  148. message_loop.PostMessage(message);
  149. }
  150. void ProgressiveDownload::Seek(uint64_t start, uint64_t end)
  151. {
  152. seek_message_t *message = (seek_message_t *)message_loop.AllocateMessage();
  153. message->message = MESSAGE_SEEK;
  154. message->start = start;
  155. message->end = end;
  156. message_loop.PostMessage(message);
  157. }
  158. ns_error_t ProgressiveDownload::Initialize(nx_uri_t url, jnl_http_t http, const char *user_agent, nx_uri_t temp_filename)
  159. {
  160. this->url = NXURIRetain(url);
  161. this->temp_filename = NXURIRetain(temp_filename);
  162. if (user_agent)
  163. this->user_agent = strdup(user_agent);
  164. this->http = jnl_http_retain(http);
  165. progressive_file_write = NXFile_fopen(temp_filename, nx_file_FILE_readwrite_binary);
  166. if (progressive_file_write == 0)
  167. return NErr_FailedCreate;
  168. return NXThreadCreate(&download_thread, _ProgressiveThread, this);
  169. }
  170. void ProgressiveDownload::ProcessMessage(nu::message_node_t *message)
  171. {
  172. switch(message->message)
  173. {
  174. case MESSAGE_KILL:
  175. killswitch=1;
  176. break;
  177. case MESSAGE_SEEK:
  178. {
  179. seek_message_t *seek_message = (seek_message_t *)message;
  180. char buffer[HTTP_BUFFER_SIZE] = {0};
  181. /* empty out the jnetlib buffer. that might let us be able to avoid this seek */
  182. DoRead(buffer, sizeof(buffer));
  183. uint64_t new_start, new_end;
  184. if (!progress_tracker.Valid(seek_message->start, seek_message->end) /* double check that we actually need to seek */
  185. && !progress_tracker.Seek(seek_message->start, seek_message->end, &new_start, &new_end))
  186. {
  187. int ret = SetupConnection(new_start, new_end);
  188. if (ret == NErr_Success)
  189. ret = Connect();
  190. if (ret != NErr_Success)
  191. {
  192. parent.OnError(ret);
  193. killswitch=1;
  194. break;
  195. }
  196. _fseeki64(progressive_file_write, new_start, SEEK_SET);
  197. }
  198. else
  199. parent.OnConnected();
  200. }
  201. break;
  202. }
  203. message_loop.FreeMessage(message);
  204. }
  205. int ProgressiveDownload::Wait(int milliseconds)
  206. {
  207. for (;;)
  208. {
  209. if (killswitch)
  210. return 1;
  211. nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
  212. if (message)
  213. ProcessMessage(message);
  214. else
  215. break;
  216. }
  217. nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
  218. if (message)
  219. ProcessMessage(message);
  220. return killswitch;
  221. }
  222. ns_error_t ProgressiveDownload::SetupConnection(uint64_t start_position, uint64_t end_position)
  223. {
  224. if (!http)
  225. http = jnl_http_create(HTTP_BUFFER_SIZE, 0);
  226. if (!http)
  227. return NErr_FailedCreate;
  228. jnl_http_reset_headers(http);
  229. if (user_agent)
  230. jnl_http_addheadervalue(http, "User-Agent", user_agent);
  231. if (start_position && start_position != (uint64_t)-1)
  232. {
  233. if (end_position == (uint64_t)-1)
  234. {
  235. char temp[128] = {0};
  236. sprintf(temp, "Range: bytes=%llu-", start_position);
  237. jnl_http_addheader(http, temp);
  238. }
  239. else
  240. {
  241. char temp[128] = {0};
  242. sprintf(temp, "Range: bytes=%llu-%llu", start_position, end_position);
  243. jnl_http_addheader(http, temp);
  244. }
  245. }
  246. jnl_http_addheader(http, "Connection: Close"); // TODO: change if we ever want a persistent connection and downloading in chunks
  247. jnl_http_connect(http, AutoCharUTF8(url), 1, "GET");
  248. return NErr_Success;
  249. }
  250. int ProgressiveDownload::Connect()
  251. {
  252. // TODO: configurable timeout
  253. /* wait for connection */
  254. #ifdef _DEBUG
  255. const int timeout = 15000;
  256. #else
  257. const int timeout = 15;
  258. #endif
  259. time_t start_time = time(0);
  260. int http_status = jnl_http_get_status(http);
  261. while (http_status == HTTPGET_STATUS_CONNECTING || http_status == HTTPGET_STATUS_READING_HEADERS)
  262. {
  263. if (Wait(55) != 0)
  264. return NErr_Interrupted;
  265. int ret = jnl_http_run(http);
  266. if (ret == HTTPGET_RUN_ERROR)
  267. return NErr_ConnectionFailed;
  268. if (start_time + timeout < time(0))
  269. return NErr_TimedOut;
  270. http_status = jnl_http_get_status(http);
  271. }
  272. if (http_status == HTTPGET_STATUS_ERROR)
  273. {
  274. switch(jnl_http_getreplycode(http))
  275. {
  276. case 400:
  277. return NErr_BadRequest;
  278. case 401:
  279. // TODO: deal with this specially
  280. return NErr_Unauthorized;
  281. case 403:
  282. // TODO: deal with this specially?
  283. return NErr_Forbidden;
  284. case 404:
  285. return NErr_NotFound;
  286. case 405:
  287. return NErr_BadMethod;
  288. case 406:
  289. return NErr_NotAcceptable;
  290. case 407:
  291. // TODO: deal with this specially
  292. return NErr_ProxyAuthenticationRequired;
  293. case 408:
  294. return NErr_RequestTimeout;
  295. case 409:
  296. return NErr_Conflict;
  297. case 410:
  298. return NErr_Gone;
  299. case 500:
  300. return NErr_InternalServerError;
  301. case 503:
  302. return NErr_ServiceUnavailable;
  303. default:
  304. return NErr_ConnectionFailed;
  305. }
  306. }
  307. else
  308. {
  309. if (!file_size)
  310. {
  311. // TODO: check range header for actual size
  312. file_size = jnl_http_content_length(http);
  313. parent.OnFileSize(file_size);
  314. }
  315. parent.OnConnected();
  316. return NErr_Success;
  317. }
  318. }
  319. void ProgressiveDownload::Internal_Write(const void *data, size_t data_len)
  320. {
  321. size_t bytes_written = fwrite(data, 1, data_len, progressive_file_write);
  322. fflush(progressive_file_write);
  323. progress_tracker.Write(bytes_written);
  324. }
  325. int ProgressiveDownload::DoRead(void *buffer, size_t bufferlen)
  326. {
  327. int ret = jnl_http_run(http);
  328. size_t bytes_received;
  329. do
  330. {
  331. ret = jnl_http_run(http);
  332. bytes_received = jnl_http_get_bytes(http, buffer, bufferlen);
  333. if (bytes_received)
  334. {
  335. Internal_Write(buffer, bytes_received);
  336. }
  337. /* TODO: benski> should we limit the number of times through this loop?
  338. I'm worried that if data comes in fast enough we might get stuck in this for a long time */
  339. } while (bytes_received == bufferlen);
  340. return ret;
  341. }
  342. nx_thread_return_t ProgressiveDownload::ProgressiveThread()
  343. {
  344. ns_error_t ret;
  345. if (!http)
  346. {
  347. ret = SetupConnection(0, (uint64_t)-1);
  348. if (ret != NErr_Success)
  349. {
  350. parent.OnError(ret);
  351. parent.OnClosed();
  352. return 0;
  353. }
  354. }
  355. ret = Connect();
  356. if (ret != NErr_Success)
  357. {
  358. parent.OnError(ret);
  359. }
  360. else
  361. {
  362. for (;;)
  363. {
  364. if (Wait(10) == 1)
  365. break; // killed!
  366. char buffer[HTTP_BUFFER_SIZE] = {0};
  367. int ret = DoRead(buffer, sizeof(buffer));
  368. if (ret == -1)
  369. break;
  370. else if (ret == HTTPGET_RUN_CONNECTION_CLOSED)
  371. {
  372. if (jnl_http_bytes_available(http) == 0)
  373. {
  374. if (progress_tracker.Valid(0, file_size))
  375. {
  376. // file is completely downloaded. let's gtfo
  377. fclose(progressive_file_write);
  378. progressive_file_write=0;
  379. break;
  380. }
  381. // if we're not completely full then we need to sit around for a potential MESSAGE_SEEK
  382. //while (Wait(100) == 0)
  383. {
  384. // nop
  385. }
  386. }
  387. }
  388. }
  389. }
  390. parent.OnClosed();
  391. return 0;
  392. }
  393. /* ------------------ */
  394. NXFileObject_ProgressiveDownloader::NXFileObject_ProgressiveDownloader() : download(progress_tracker, *this)
  395. {
  396. progressive_file_read=0;
  397. end_of_file=false;
  398. connected=false;
  399. error_code=NErr_Success;
  400. closed = false;
  401. need_seek=false;
  402. position=0;
  403. }
  404. NXFileObject_ProgressiveDownloader::~NXFileObject_ProgressiveDownloader()
  405. {
  406. download.Close();
  407. while (!closed)
  408. Wait(10);
  409. if (progressive_file_read)
  410. fclose(progressive_file_read);
  411. }
  412. void NXFileObject_ProgressiveDownloader::OnConnected()
  413. {
  414. nu::message_node_t *message = message_loop.AllocateMessage();
  415. message->message = MESSAGE_CONNECTED;
  416. message_loop.PostMessage(message);
  417. }
  418. void NXFileObject_ProgressiveDownloader::OnError(int error_code)
  419. {
  420. error_message_t *message = (error_message_t *)message_loop.AllocateMessage();
  421. message->message = MESSAGE_ERROR;
  422. message->error_code = error_code;
  423. message_loop.PostMessage(message);
  424. }
  425. void NXFileObject_ProgressiveDownloader::OnFileSize(uint64_t size)
  426. {
  427. size_message_t *message = (size_message_t *)message_loop.AllocateMessage();
  428. message->message = MESSAGE_SIZE;
  429. message->size = size;
  430. message_loop.PostMessage(message);
  431. }
  432. void NXFileObject_ProgressiveDownloader::OnClosed()
  433. {
  434. nu::message_node_t *message = message_loop.AllocateMessage();
  435. message->message = MESSAGE_CLOSED;
  436. message_loop.PostMessage(message);
  437. }
  438. ns_error_t NXFileObject_ProgressiveDownloader::Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent)
  439. {
  440. ReferenceCountedNXURI temp_uri;
  441. NXURICreateTemp(&temp_uri);
  442. ns_error_t ret = download.Initialize(uri, http, user_agent, temp_uri);
  443. if (ret != NErr_Success)
  444. {
  445. closed=true;
  446. return ret;
  447. }
  448. progressive_file_read = NXFile_fopen(temp_uri, nx_file_FILE_read_binary);
  449. for (;;)
  450. {
  451. Wait(10);
  452. if (error_code != NErr_Success)
  453. return error_code;
  454. if (connected)
  455. break;
  456. }
  457. return NErr_Success;
  458. }
  459. void NXFileObject_ProgressiveDownloader::ProcessMessage(nu::message_node_t *message)
  460. {
  461. switch(message->message)
  462. {
  463. case MESSAGE_ERROR:
  464. {
  465. error_message_t *seek_message = (error_message_t *)message;
  466. error_code = seek_message->error_code;
  467. }
  468. break;
  469. case MESSAGE_CONNECTED:
  470. connected = true;
  471. break;
  472. case MESSAGE_SIZE:
  473. {
  474. size_message_t *seek_message = (size_message_t *)message;
  475. region.end = seek_message->size;
  476. }
  477. break;
  478. case MESSAGE_CLOSED:
  479. closed=true;
  480. break;
  481. }
  482. message_loop.FreeMessage(message);
  483. }
  484. void NXFileObject_ProgressiveDownloader::Wait(unsigned int milliseconds)
  485. {
  486. for (;;)
  487. {
  488. nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
  489. if (message)
  490. ProcessMessage(message);
  491. else
  492. break;
  493. }
  494. nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
  495. if (message)
  496. ProcessMessage(message);
  497. }
  498. bool NXFileObject_ProgressiveDownloader::WaitForRead(uint64_t size)
  499. {
  500. if (progress_tracker.Valid(position, position+size))
  501. return true;
  502. if (need_seek)
  503. {
  504. // give it just a little bit of time to avoid constant reseeks when the download thread is just barely keeping up
  505. Wait(10);
  506. if (progress_tracker.Valid(position, position+size))
  507. return true;
  508. connected=false;
  509. error_code=NErr_Success;
  510. download.Seek(position, (uint64_t)position+size);
  511. for (;;)
  512. {
  513. Wait(10);
  514. if (error_code != NErr_Success)
  515. return false;
  516. if (connected)
  517. break;
  518. }
  519. }
  520. while (!progress_tracker.Valid(position, position+size))
  521. {
  522. Wait(10);
  523. }
  524. return true;
  525. }
  526. ns_error_t NXFileObject_ProgressiveDownloader::Read(void *buffer, size_t bytes_requested, size_t *bytes_read)
  527. {
  528. if (end_of_file || position >= (region.end - region.start))
  529. return NErr_EndOfFile;
  530. // don't allow a read past the end of the file as this will confuse progress_tracker (which doesn't know/care about the file length)
  531. if ((position + bytes_requested) > region.end)
  532. bytes_requested = (size_t)(region.end - position);
  533. if (WaitForRead((uint64_t)bytes_requested) == false)
  534. {
  535. *bytes_read = 0;
  536. return error_code;
  537. }
  538. if (need_seek)
  539. {
  540. _fseeki64(progressive_file_read, position, SEEK_SET);
  541. need_seek=false;
  542. }
  543. /* TODO: benski> if r < bytes_requested, then we need to flush the buffer.
  544. on windows, we can use fflush(progressive_file_read)
  545. on other platforms it's not guaranteed! */
  546. size_t r = fread(buffer, 1, bytes_requested, progressive_file_read);
  547. this->position += r;
  548. *bytes_read = r;
  549. return NErr_Success;
  550. }
  551. ns_error_t NXFileObject_ProgressiveDownloader::Seek(uint64_t new_position)
  552. {
  553. if (new_position >= (region.end - region.start))
  554. {
  555. this->position = region.end - region.start;
  556. end_of_file=true;
  557. }
  558. else
  559. {
  560. if (new_position == position)
  561. return NErr_Success;
  562. position = new_position;
  563. need_seek=true;
  564. end_of_file=false;
  565. }
  566. return NErr_Success;
  567. }
  568. ns_error_t NXFileObject_ProgressiveDownloader::Tell(uint64_t *position)
  569. {
  570. if (end_of_file)
  571. *position = region.end - region.start;
  572. else
  573. *position = this->position - region.start;
  574. return NErr_Success;
  575. }
  576. ns_error_t NXFileObject_ProgressiveDownloader::PeekByte(uint8_t *byte)
  577. {
  578. if (position == region.end)
  579. return NErr_EndOfFile;
  580. // make sure we have enough room
  581. if (WaitForRead((uint64_t)1) == false)
  582. return error_code;
  583. if (need_seek)
  584. {
  585. _fseeki64(progressive_file_read, position, SEEK_SET);
  586. need_seek=false;
  587. }
  588. int read_byte = fgetc(progressive_file_read);
  589. if (read_byte != EOF)
  590. ungetc(read_byte, progressive_file_read);
  591. else
  592. {
  593. /* TODO: benski> if we hit the point, then we actually need to flush the buffer.
  594. on some platforms, fflush(progressive_file_read) will do that, but it's not guaranteed! */
  595. return NErr_EndOfFile;
  596. }
  597. *byte = (uint8_t)read_byte;
  598. return NErr_Success;
  599. }
  600. ns_error_t NXFileObject_ProgressiveDownloader::Sync()
  601. {
  602. return NErr_NotImplemented;
  603. }
  604. ns_error_t NXFileObject_ProgressiveDownloader::Truncate()
  605. {
  606. return NErr_NotImplemented;
  607. }
  608. ns_error_t NXFileObject_ProgressiveDownloader::Write(const void *buffer, size_t bytes)
  609. {
  610. return NErr_NotImplemented;
  611. }
  612. bool NXFileObject_ProgressiveDownloader::Available(uint64_t size, uint64_t *available)
  613. {
  614. uint64_t end = position+size;
  615. if (end > region.end)
  616. end = region.end;
  617. if (position == region.end)
  618. {
  619. if (available)
  620. *available=0;
  621. return true;
  622. }
  623. return progress_tracker.Valid(position, end, available);
  624. }
  625. ns_error_t NXFileOpenProgressiveDownloader(nx_file_t *out_file, nx_uri_t filename, nx_file_FILE_flags_t flags, jnl_http_t http, const char *user_agent)
  626. {
  627. NXFileObject_ProgressiveDownloader *file_object = new (std::nothrow) NXFileObject_ProgressiveDownloader;
  628. if (!file_object)
  629. return NErr_OutOfMemory;
  630. ns_error_t ret = file_object->Initialize(filename, http, user_agent);
  631. if (ret != NErr_Success)
  632. {
  633. delete file_object;
  634. return ret;
  635. }
  636. *out_file = (nx_file_t)file_object;
  637. return NErr_Success;
  638. }
  639. ns_error_t NXFileProgressiveDownloaderAvailable(nx_file_t _f, uint64_t size, uint64_t *available)
  640. {
  641. if (!_f)
  642. return NErr_BadParameter;
  643. NXFileObject_ProgressiveDownloader *f = (NXFileObject_ProgressiveDownloader *)_f;
  644. if (f->Available(size, available))
  645. return NErr_True;
  646. else
  647. return NErr_False;
  648. }