123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749 |
- #include "NXFileObject.h"
- #include "nu/ProgressTracker.h"
- #include "nx/nxthread.h"
- #include "nx/nxsleep.h"
- #include "jnetlib/jnetlib.h"
- #include "../nswasabi/AutoCharNX.h"
- #include "nswasabi/ReferenceCounted.h"
- #include "nu/MessageLoop.h"
- #include <time.h>
- #include <new>
- #include "../../../WAT/WAT.h"
- /* TODO: benski> test this with a server that does not return content-length. I bet we could get it to work */
- /* TODO: benski> on windows, we can use a single CreateFile HANDLE for both reading and writing
- and use ReadFile(..., &overlapped) to maintain two separate file pointers
- this should improve performance as they will share the same cache
- _might_ have to use async I/O to get it to work (but use it synchronously by waiting on the handle after making the call
- */
- #define HTTP_BUFFER_SIZE 65536
- class NXFileObject_ProgressiveDownloader;
- enum
- {
- MESSAGE_KILL,
- MESSAGE_SEEK,
- MESSAGE_SIZE,
- MESSAGE_ERROR,
- MESSAGE_CLOSED,
- MESSAGE_CONNECTED,
- };
- char MessageString[6][10] =
- {
- "Kill",
- "Seek",
- "Size",
- "Error",
- "Closed",
- "Connected"
- };
- struct seek_message_t : public nu::message_node_t
- {
- uint64_t start;
- uint64_t end;
- };
- struct size_message_t : public nu::message_node_t
- {
- uint64_t size;
- };
- struct error_message_t : public nu::message_node_t
- {
- int error_code;
- };
- /* This class represents the thread that's actually downloading the content from the server */
- class ProgressiveDownload
- {
- public:
- ProgressiveDownload(ProgressTracker &progress_tracker, NXFileObject_ProgressiveDownloader &parent);
- ~ProgressiveDownload();
- ns_error_t Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent, nx_uri_t temp_uri);
-
- void Seek(uint64_t start, uint64_t end);
- void Close();
- private:
- /* These functions are called on the local thread */
- /* These functions run on the download thread */
- static nx_thread_return_t NXTHREADCALL _ProgressiveThread(nx_thread_parameter_t param) { return ((ProgressiveDownload *)param)->ProgressiveThread(); }
- nx_thread_return_t NXTHREADCALL ProgressiveThread();
- int Connect();
- void Internal_Write(const void *data, size_t data_len);
- int Wait(int milliseconds);
- ns_error_t SetupConnection(uint64_t start_position, uint64_t end_position);
- int DoRead(void *buffer, size_t bufferlen);
- void ProcessMessage(nu::message_node_t *message);
- private:
- ProgressTracker &progress_tracker;
- NXFileObject_ProgressiveDownloader &parent;
- nx_uri_t temp_filename, url;
- FILE *progressive_file_write;
- jnl_http_t http;
- char *user_agent;
- nx_thread_t download_thread;
- nu::MessageLoop message_loop;
- uint64_t file_size;
- int killswitch;
- };
- class NXFileObject_ProgressiveDownloader: public NXFileObject
- {
- public:
- NXFileObject_ProgressiveDownloader();
- ~NXFileObject_ProgressiveDownloader();
- ns_error_t Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent);
- bool Available(uint64_t size, uint64_t *available);
- /* API used by ProgressiveDownload */
- void OnFileSize(uint64_t filesize);
- void OnConnected();
- void OnError(int error_code);
- void OnClosed();
- private:
- /* NXFileObject implementation */
- ns_error_t Read(void *buffer, size_t bytes_requested, size_t *bytes_read);
- ns_error_t Write(const void *buffer, size_t bytes);
- ns_error_t Seek(uint64_t position);
- ns_error_t Tell(uint64_t *position);
- ns_error_t PeekByte(uint8_t *byte);
- ns_error_t Sync();
- ns_error_t Truncate();
- bool WaitForRead(uint64_t size);
- void ProcessMessage(nu::message_node_t *message);
- void Wait(unsigned int milliseconds);
- ProgressiveDownload download;
- ProgressTracker progress_tracker;
- FILE *progressive_file_read;
- bool end_of_file;
- bool connected;
- int error_code;
- nu::MessageLoop message_loop;
- bool closed;
- bool need_seek; // if set to true, we need to fseek(position)
- };
- ProgressiveDownload::ProgressiveDownload(ProgressTracker &progress_tracker, NXFileObject_ProgressiveDownloader &parent) : progress_tracker(progress_tracker), parent(parent)
- {
- killswitch=0;
- url=0;
- temp_filename=0;
- progressive_file_write=0;
- http=0;
- user_agent=0;
- download_thread=0;
- file_size=0;
- }
- ProgressiveDownload::~ProgressiveDownload()
- {
- if (download_thread)
- {
- Close();
- NXThreadJoin(download_thread, 0);
- }
- // TODO: flush messages
- if (progressive_file_write)
- fclose(progressive_file_write);
- NXURIRelease(temp_filename);
- NXURIRelease(url);
- if (http)
- jnl_http_release(http);
- free(user_agent);
- }
- void ProgressiveDownload::Close()
- {
- nu::message_node_t *message = message_loop.AllocateMessage();
- message->message = MESSAGE_KILL;
- message_loop.PostMessage(message);
- }
- void ProgressiveDownload::Seek(uint64_t start, uint64_t end)
- {
- seek_message_t *message = (seek_message_t *)message_loop.AllocateMessage();
- message->message = MESSAGE_SEEK;
- message->start = start;
- message->end = end;
- message_loop.PostMessage(message);
- }
- ns_error_t ProgressiveDownload::Initialize(nx_uri_t url, jnl_http_t http, const char *user_agent, nx_uri_t temp_filename)
- {
- this->url = NXURIRetain(url);
- this->temp_filename = NXURIRetain(temp_filename);
- if (user_agent)
- this->user_agent = strdup(user_agent);
- this->http = jnl_http_retain(http);
- progressive_file_write = NXFile_fopen(temp_filename, nx_file_FILE_readwrite_binary);
- if (progressive_file_write == 0)
- return NErr_FailedCreate;
- return NXThreadCreate(&download_thread, _ProgressiveThread, this);
- }
- void ProgressiveDownload::ProcessMessage(nu::message_node_t *message)
- {
- switch(message->message)
- {
- case MESSAGE_KILL:
- killswitch=1;
- break;
- case MESSAGE_SEEK:
- {
- seek_message_t *seek_message = (seek_message_t *)message;
- char buffer[HTTP_BUFFER_SIZE] = {0};
- /* empty out the jnetlib buffer. that might let us be able to avoid this seek */
- DoRead(buffer, sizeof(buffer));
-
- uint64_t new_start, new_end;
- if (!progress_tracker.Valid(seek_message->start, seek_message->end) /* double check that we actually need to seek */
- && !progress_tracker.Seek(seek_message->start, seek_message->end, &new_start, &new_end))
- {
- int ret = SetupConnection(new_start, new_end);
- if (ret == NErr_Success)
- ret = Connect();
- if (ret != NErr_Success)
- {
- parent.OnError(ret);
- killswitch=1;
- break;
- }
- _fseeki64(progressive_file_write, new_start, SEEK_SET);
- }
- else
- parent.OnConnected();
- }
- break;
- }
- message_loop.FreeMessage(message);
- }
- int ProgressiveDownload::Wait(int milliseconds)
- {
- for (;;)
- {
- if (killswitch)
- return 1;
- nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
- if (message)
- ProcessMessage(message);
- else
- break;
- }
- nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
- if (message)
- ProcessMessage(message);
-
- return killswitch;
- }
- ns_error_t ProgressiveDownload::SetupConnection(uint64_t start_position, uint64_t end_position)
- {
- if (!http)
- http = jnl_http_create(HTTP_BUFFER_SIZE, 0);
- if (!http)
- return NErr_FailedCreate;
- jnl_http_reset_headers(http);
- if (user_agent)
- jnl_http_addheadervalue(http, "User-Agent", user_agent);
- if (start_position && start_position != (uint64_t)-1)
- {
- if (end_position == (uint64_t)-1)
- {
- char temp[128] = {0};
- sprintf(temp, "Range: bytes=%llu-", start_position);
- jnl_http_addheader(http, temp);
- }
- else
- {
- char temp[128] = {0};
- sprintf(temp, "Range: bytes=%llu-%llu", start_position, end_position);
- jnl_http_addheader(http, temp);
- }
- }
-
- jnl_http_addheader(http, "Connection: Close"); // TODO: change if we ever want a persistent connection and downloading in chunks
- jnl_http_connect(http, AutoCharUTF8(url), 1, "GET");
- return NErr_Success;
- }
- int ProgressiveDownload::Connect()
- {
- // TODO: configurable timeout
- /* wait for connection */
- #ifdef _DEBUG
- const int timeout = 15000;
- #else
- const int timeout = 15;
- #endif
- time_t start_time = time(0);
- int http_status = jnl_http_get_status(http);
- while (http_status == HTTPGET_STATUS_CONNECTING || http_status == HTTPGET_STATUS_READING_HEADERS)
- {
- if (Wait(55) != 0)
- return NErr_Interrupted;
- int ret = jnl_http_run(http);
- if (ret == HTTPGET_RUN_ERROR)
- return NErr_ConnectionFailed;
- if (start_time + timeout < time(0))
- return NErr_TimedOut;
- http_status = jnl_http_get_status(http);
- }
-
- if (http_status == HTTPGET_STATUS_ERROR)
- {
- switch(jnl_http_getreplycode(http))
- {
- case 400:
- return NErr_BadRequest;
- case 401:
- // TODO: deal with this specially
- return NErr_Unauthorized;
- case 403:
- // TODO: deal with this specially?
- return NErr_Forbidden;
- case 404:
- return NErr_NotFound;
- case 405:
- return NErr_BadMethod;
- case 406:
- return NErr_NotAcceptable;
- case 407:
- // TODO: deal with this specially
- return NErr_ProxyAuthenticationRequired;
- case 408:
- return NErr_RequestTimeout;
- case 409:
- return NErr_Conflict;
- case 410:
- return NErr_Gone;
- case 500:
- return NErr_InternalServerError;
- case 503:
- return NErr_ServiceUnavailable;
- default:
- return NErr_ConnectionFailed;
- }
- }
- else
- {
- if (!file_size)
- {
- // TODO: check range header for actual size
- file_size = jnl_http_content_length(http);
- parent.OnFileSize(file_size);
- }
- parent.OnConnected();
- return NErr_Success;
- }
- }
- void ProgressiveDownload::Internal_Write(const void *data, size_t data_len)
- {
- size_t bytes_written = fwrite(data, 1, data_len, progressive_file_write);
- fflush(progressive_file_write);
- progress_tracker.Write(bytes_written);
- }
- int ProgressiveDownload::DoRead(void *buffer, size_t bufferlen)
- {
- int ret = jnl_http_run(http);
- size_t bytes_received;
- do
- {
- ret = jnl_http_run(http);
- bytes_received = jnl_http_get_bytes(http, buffer, bufferlen);
- if (bytes_received)
- {
- Internal_Write(buffer, bytes_received);
- }
- /* TODO: benski> should we limit the number of times through this loop?
- I'm worried that if data comes in fast enough we might get stuck in this for a long time */
- } while (bytes_received == bufferlen);
- return ret;
- }
- nx_thread_return_t ProgressiveDownload::ProgressiveThread()
- {
- ns_error_t ret;
- if (!http)
- {
- ret = SetupConnection(0, (uint64_t)-1);
- if (ret != NErr_Success)
- {
- parent.OnError(ret);
- parent.OnClosed();
- return 0;
- }
- }
- ret = Connect();
- if (ret != NErr_Success)
- {
- parent.OnError(ret);
- }
- else
- {
- for (;;)
- {
- if (Wait(10) == 1)
- break; // killed!
- char buffer[HTTP_BUFFER_SIZE] = {0};
- int ret = DoRead(buffer, sizeof(buffer));
- if (ret == -1)
- break;
- else if (ret == HTTPGET_RUN_CONNECTION_CLOSED)
- {
- if (jnl_http_bytes_available(http) == 0)
- {
- if (progress_tracker.Valid(0, file_size))
- {
- // file is completely downloaded. let's gtfo
- fclose(progressive_file_write);
- progressive_file_write=0;
- break;
- }
- // if we're not completely full then we need to sit around for a potential MESSAGE_SEEK
- //while (Wait(100) == 0)
- {
- // nop
- }
- }
- }
- }
- }
- parent.OnClosed();
- return 0;
- }
- /* ------------------ */
- NXFileObject_ProgressiveDownloader::NXFileObject_ProgressiveDownloader() : download(progress_tracker, *this)
- {
- progressive_file_read=0;
- end_of_file=false;
- connected=false;
- error_code=NErr_Success;
- closed = false;
- need_seek=false;
- position=0;
- }
- NXFileObject_ProgressiveDownloader::~NXFileObject_ProgressiveDownloader()
- {
- download.Close();
- while (!closed)
- Wait(10);
- if (progressive_file_read)
- fclose(progressive_file_read);
- }
- void NXFileObject_ProgressiveDownloader::OnConnected()
- {
- nu::message_node_t *message = message_loop.AllocateMessage();
- message->message = MESSAGE_CONNECTED;
- message_loop.PostMessage(message);
- }
- void NXFileObject_ProgressiveDownloader::OnError(int error_code)
- {
- error_message_t *message = (error_message_t *)message_loop.AllocateMessage();
- message->message = MESSAGE_ERROR;
- message->error_code = error_code;
- message_loop.PostMessage(message);
- }
- void NXFileObject_ProgressiveDownloader::OnFileSize(uint64_t size)
- {
- size_message_t *message = (size_message_t *)message_loop.AllocateMessage();
- message->message = MESSAGE_SIZE;
- message->size = size;
- message_loop.PostMessage(message);
- }
- void NXFileObject_ProgressiveDownloader::OnClosed()
- {
- nu::message_node_t *message = message_loop.AllocateMessage();
- message->message = MESSAGE_CLOSED;
- message_loop.PostMessage(message);
- }
- ns_error_t NXFileObject_ProgressiveDownloader::Initialize(nx_uri_t uri, jnl_http_t http, const char *user_agent)
- {
- ReferenceCountedNXURI temp_uri;
- NXURICreateTemp(&temp_uri);
- ns_error_t ret = download.Initialize(uri, http, user_agent, temp_uri);
- if (ret != NErr_Success)
- {
- closed=true;
- return ret;
- }
- progressive_file_read = NXFile_fopen(temp_uri, nx_file_FILE_read_binary);
- for (;;)
- {
- Wait(10);
- if (error_code != NErr_Success)
- return error_code;
- if (connected)
- break;
- }
- return NErr_Success;
- }
- void NXFileObject_ProgressiveDownloader::ProcessMessage(nu::message_node_t *message)
- {
- switch(message->message)
- {
- case MESSAGE_ERROR:
- {
- error_message_t *seek_message = (error_message_t *)message;
- error_code = seek_message->error_code;
- }
- break;
- case MESSAGE_CONNECTED:
- connected = true;
- break;
- case MESSAGE_SIZE:
- {
- size_message_t *seek_message = (size_message_t *)message;
- region.end = seek_message->size;
- }
- break;
- case MESSAGE_CLOSED:
- closed=true;
- break;
- }
- message_loop.FreeMessage(message);
- }
- void NXFileObject_ProgressiveDownloader::Wait(unsigned int milliseconds)
- {
- for (;;)
- {
- nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
- if (message)
- ProcessMessage(message);
- else
- break;
- }
- nu::message_node_t *message = message_loop.PeekMessage(milliseconds);
- if (message)
- ProcessMessage(message);
- }
- bool NXFileObject_ProgressiveDownloader::WaitForRead(uint64_t size)
- {
- if (progress_tracker.Valid(position, position+size))
- return true;
- if (need_seek)
- {
- // give it just a little bit of time to avoid constant reseeks when the download thread is just barely keeping up
- Wait(10);
- if (progress_tracker.Valid(position, position+size))
- return true;
- connected=false;
- error_code=NErr_Success;
- download.Seek(position, (uint64_t)position+size);
-
- for (;;)
- {
- Wait(10);
- if (error_code != NErr_Success)
- return false;
-
- if (connected)
- break;
- }
- }
- while (!progress_tracker.Valid(position, position+size))
- {
- Wait(10);
- }
- return true;
- }
- ns_error_t NXFileObject_ProgressiveDownloader::Read(void *buffer, size_t bytes_requested, size_t *bytes_read)
- {
- if (end_of_file || position >= (region.end - region.start))
- return NErr_EndOfFile;
- // don't allow a read past the end of the file as this will confuse progress_tracker (which doesn't know/care about the file length)
- if ((position + bytes_requested) > region.end)
- bytes_requested = (size_t)(region.end - position);
- if (WaitForRead((uint64_t)bytes_requested) == false)
- {
- *bytes_read = 0;
- return error_code;
- }
- if (need_seek)
- {
- _fseeki64(progressive_file_read, position, SEEK_SET);
- need_seek=false;
- }
- /* TODO: benski> if r < bytes_requested, then we need to flush the buffer.
- on windows, we can use fflush(progressive_file_read)
- on other platforms it's not guaranteed! */
- size_t r = fread(buffer, 1, bytes_requested, progressive_file_read);
- this->position += r;
- *bytes_read = r;
- return NErr_Success;
- }
- ns_error_t NXFileObject_ProgressiveDownloader::Seek(uint64_t new_position)
- {
- if (new_position >= (region.end - region.start))
- {
- this->position = region.end - region.start;
- end_of_file=true;
- }
- else
- {
- if (new_position == position)
- return NErr_Success;
- position = new_position;
- need_seek=true;
-
- end_of_file=false;
- }
- return NErr_Success;
- }
- ns_error_t NXFileObject_ProgressiveDownloader::Tell(uint64_t *position)
- {
- if (end_of_file)
- *position = region.end - region.start;
- else
- *position = this->position - region.start;
- return NErr_Success;
- }
- ns_error_t NXFileObject_ProgressiveDownloader::PeekByte(uint8_t *byte)
- {
- if (position == region.end)
- return NErr_EndOfFile;
- // make sure we have enough room
- if (WaitForRead((uint64_t)1) == false)
- return error_code;
- if (need_seek)
- {
- _fseeki64(progressive_file_read, position, SEEK_SET);
- need_seek=false;
- }
- int read_byte = fgetc(progressive_file_read);
- if (read_byte != EOF)
- ungetc(read_byte, progressive_file_read);
- else
- {
- /* TODO: benski> if we hit the point, then we actually need to flush the buffer.
- on some platforms, fflush(progressive_file_read) will do that, but it's not guaranteed! */
- return NErr_EndOfFile;
- }
- *byte = (uint8_t)read_byte;
- return NErr_Success;
- }
- ns_error_t NXFileObject_ProgressiveDownloader::Sync()
- {
- return NErr_NotImplemented;
- }
- ns_error_t NXFileObject_ProgressiveDownloader::Truncate()
- {
- return NErr_NotImplemented;
- }
- ns_error_t NXFileObject_ProgressiveDownloader::Write(const void *buffer, size_t bytes)
- {
- return NErr_NotImplemented;
- }
- bool NXFileObject_ProgressiveDownloader::Available(uint64_t size, uint64_t *available)
- {
- uint64_t end = position+size;
- if (end > region.end)
- end = region.end;
- if (position == region.end)
- {
- if (available)
- *available=0;
- return true;
- }
- return progress_tracker.Valid(position, end, available);
- }
- ns_error_t NXFileOpenProgressiveDownloader(nx_file_t *out_file, nx_uri_t filename, nx_file_FILE_flags_t flags, jnl_http_t http, const char *user_agent)
- {
- NXFileObject_ProgressiveDownloader *file_object = new (std::nothrow) NXFileObject_ProgressiveDownloader;
- if (!file_object)
- return NErr_OutOfMemory;
- ns_error_t ret = file_object->Initialize(filename, http, user_agent);
- if (ret != NErr_Success)
- {
- delete file_object;
- return ret;
- }
- *out_file = (nx_file_t)file_object;
- return NErr_Success;
- }
- ns_error_t NXFileProgressiveDownloaderAvailable(nx_file_t _f, uint64_t size, uint64_t *available)
- {
- if (!_f)
- return NErr_BadParameter;
- NXFileObject_ProgressiveDownloader *f = (NXFileObject_ProgressiveDownloader *)_f;
- if (f->Available(size, available))
- return NErr_True;
- else
- return NErr_False;
- }
|