[pacman-dev] [PATCH] Implement multiplexed download using mCURL

Anatol Pomozov anatol.pomozov at gmail.com
Tue Mar 10 00:30:18 UTC 2020


Hello

The the the largest change (in terms of LoC) in this patch series. It
add the multi-curl machinery, the bread-and-butter of the parallel
download functionality.

On Mon, Mar 9, 2020 at 5:28 PM Anatol Pomozov <anatol.pomozov at gmail.com> wrote:
>
> curl_multi_download_internal() is the main loop that creates up to
> 'ParallelDownloads' easy curl handles, adds them to mcurl and then
> performs curl execution. This is when the paralled downloads heppen.
> Once any of the downloads complete the function checks its result.
> In case if the download fails it initiates retry with the next server
> from payload->servers list. At the download completion all the payload
> resources are cleaned up.
>
> curl_multi_handle_single_done() is essentially refactored version of
> curl_download_internal() adopted for multi_curl. Once mcurl porting is
> complete curl_download_internal() will be removed.
>
> Signed-off-by: Anatol Pomozov <anatol.pomozov at gmail.com>
> ---
>  lib/libalpm/dload.c | 361 +++++++++++++++++++++++++++++++++++++++++++-
>  lib/libalpm/dload.h |   2 +
>  2 files changed, 359 insertions(+), 4 deletions(-)
>
> diff --git a/lib/libalpm/dload.c b/lib/libalpm/dload.c
> index 3570e234..06bce330 100644
> --- a/lib/libalpm/dload.c
> +++ b/lib/libalpm/dload.c
> @@ -18,6 +18,7 @@
>   *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
>   */
>
> +#include <assert.h>
>  #include <stdlib.h>
>  #include <stdio.h>
>  #include <errno.h>
> @@ -271,6 +272,8 @@ static void curl_set_handle_opts(struct dload_payload *payload,
>         curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, 60L);
>         curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
>
> +       curl_easy_setopt(curl, CURLOPT_PRIVATE, (void *)payload);
> +
>         _alpm_log(handle, ALPM_LOG_DEBUG, "url: %s\n", payload->fileurl);
>
>         if(payload->max_size) {
> @@ -601,15 +604,365 @@ cleanup:
>         return ret;
>  }
>
> +/* Return 0 if retry was sucessfull, -1 otherwise */
> +static int curl_multi_retry_next_server(CURLM *curlm, CURL *curl, struct dload_payload *payload)
> +{
> +       const char *server;
> +       size_t len;
> +       alpm_handle_t *handle = payload->handle;
> +
> +       payload->servers = payload->servers->next;
> +       if(!payload->servers) {
> +               return -1;
> +       }
> +       server = payload->servers->data;
> +
> +       /* regenerate a new fileurl */
> +       free(payload->fileurl);
> +       len = strlen(server) + strlen(payload->filepath) + 2;
> +       MALLOC(payload->fileurl, len, RET_ERR(handle, ALPM_ERR_MEMORY, -1));
> +       snprintf(payload->fileurl, len, "%s/%s", server, payload->filepath);
> +
> +       if(payload->unlink_on_fail) {
> +               /* we keep the file for a new retry but remove its data if any */
> +               fflush(payload->localf);
> +               ftruncate(fileno(payload->localf), 0);
> +               fseek(payload->localf, 0, SEEK_SET);
> +       }
> +
> +       /* Set curl with the new URL */
> +       curl_easy_setopt(curl, CURLOPT_URL, payload->fileurl);
> +
> +       curl_multi_remove_handle(curlm, curl);
> +       curl_multi_add_handle(curlm, curl);
> +
> +       return 0;
> +}
> +
> +/* Returns 2 if download retry happened
> + * Returns 1 if the file is up-to-date
> + * Returns 0 if current payload is completed sucessfully
> + * Returns -1 if an error happened
> + */
> +static int curl_multi_handle_single_done(CURLM *curlm, CURLMsg *msg, const char *localpath)
> +{
> +       alpm_handle_t *handle = NULL;
> +       struct dload_payload *payload = NULL;
> +       CURL *curl = msg->easy_handle;
> +       CURLcode curlerr;
> +       char *effective_url;
> +       long timecond;
> +       double remote_size, bytes_dl = 0;
> +       long remote_time = -1;
> +       struct stat st;
> +       char hostname[HOSTNAME_SIZE];
> +       int ret = -1;
> +
> +       curlerr = curl_easy_getinfo(curl, CURLINFO_PRIVATE, &payload);
> +       assert(curlerr == CURLE_OK);
> +       handle = payload->handle;
> +
> +       curl_gethost(payload->fileurl, hostname, sizeof(hostname));
> +       curlerr = msg->data.result;
> +       _alpm_log(handle, ALPM_LOG_DEBUG, "curl returned result %d from transfer\n",
> +                       curlerr);
> +
> +       /* was it a success? */
> +       switch(curlerr) {
> +               case CURLE_OK:
> +                       /* get http/ftp response code */
> +                       _alpm_log(handle, ALPM_LOG_DEBUG, "response code: %ld\n", payload->respcode);
> +                       if(payload->respcode >= 400) {
> +                               payload->unlink_on_fail = 1;
> +                               if(!payload->errors_ok) {
> +                                       handle->pm_errno = ALPM_ERR_RETRIEVE;
> +                                       /* non-translated message is same as libcurl */
> +                                       snprintf(payload->error_buffer, sizeof(payload->error_buffer),
> +                                                       "The requested URL returned error: %ld", payload->respcode);
> +                                       _alpm_log(handle, ALPM_LOG_ERROR,
> +                                                       _("failed retrieving file '%s' from %s : %s\n"),
> +                                                       payload->remote_name, hostname, payload->error_buffer);
> +                               }
> +                               if(curl_multi_retry_next_server(curlm, curl, payload) == 0) {
> +                                       return 2;
> +                               } else {
> +                                       goto cleanup;
> +                               }
> +                       }
> +                       break;
> +               case CURLE_ABORTED_BY_CALLBACK:
> +                       /* handle the interrupt accordingly */
> +                       if(dload_interrupted == ABORT_OVER_MAXFILESIZE) {
> +                               curlerr = CURLE_FILESIZE_EXCEEDED;
> +                               payload->unlink_on_fail = 1;
> +                               handle->pm_errno = ALPM_ERR_LIBCURL;
> +                               _alpm_log(handle, ALPM_LOG_ERROR,
> +                                               _("failed retrieving file '%s' from %s : expected download size exceeded\n"),
> +                                               payload->remote_name, hostname);
> +                       }
> +                       goto cleanup;
> +               case CURLE_COULDNT_RESOLVE_HOST:
> +                       payload->unlink_on_fail = 1;
> +                       handle->pm_errno = ALPM_ERR_SERVER_BAD_URL;
> +                       _alpm_log(handle, ALPM_LOG_ERROR,
> +                                       _("failed retrieving file '%s' from %s : %s\n"),
> +                                       payload->remote_name, hostname, payload->error_buffer);
> +                       if(curl_multi_retry_next_server(curlm, curl, payload) == 0) {
> +                               return 2;
> +                       } else {
> +                               goto cleanup;
> +                       }
> +               default:
> +                       /* delete zero length downloads */
> +                       if(fstat(fileno(payload->localf), &st) == 0 && st.st_size == 0) {
> +                               payload->unlink_on_fail = 1;
> +                       }
> +                       if(!payload->errors_ok) {
> +                               handle->pm_errno = ALPM_ERR_LIBCURL;
> +                               _alpm_log(handle, ALPM_LOG_ERROR,
> +                                               _("failed retrieving file '%s' from %s : %s\n"),
> +                                               payload->remote_name, hostname, payload->error_buffer);
> +                       } else {
> +                               _alpm_log(handle, ALPM_LOG_DEBUG,
> +                                               "failed retrieving file '%s' from %s : %s\n",
> +                                               payload->remote_name, hostname, payload->error_buffer);
> +                       }
> +                       if(curl_multi_retry_next_server(curlm, curl, payload) == 0) {
> +                               return 2;
> +                       } else {
> +                               goto cleanup;
> +                       }
> +       }
> +
> +       /* retrieve info about the state of the transfer */
> +       curl_easy_getinfo(curl, CURLINFO_FILETIME, &remote_time);
> +       curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &remote_size);
> +       curl_easy_getinfo(curl, CURLINFO_SIZE_DOWNLOAD, &bytes_dl);
> +       curl_easy_getinfo(curl, CURLINFO_CONDITION_UNMET, &timecond);
> +       curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effective_url);
> +
> +       /* time condition was met and we didn't download anything. we need to
> +        * clean up the 0 byte .part file that's left behind. */
> +       if(timecond == 1 && DOUBLE_EQ(bytes_dl, 0)) {
> +               _alpm_log(handle, ALPM_LOG_DEBUG, "file met time condition\n");
> +               ret = 1;
> +               unlink(payload->tempfile_name);
> +               goto cleanup;
> +       }
> +
> +       /* remote_size isn't necessarily the full size of the file, just what the
> +        * server reported as remaining to download. compare it to what curl reported
> +        * as actually being transferred during curl_easy_perform() */
> +       if(!DOUBLE_EQ(remote_size, -1) && !DOUBLE_EQ(bytes_dl, -1) &&
> +                       !DOUBLE_EQ(bytes_dl, remote_size)) {
> +               _alpm_log(handle, ALPM_LOG_ERROR, _("%s appears to be truncated: %jd/%jd bytes\n"),
> +                               payload->remote_name, (intmax_t)bytes_dl, (intmax_t)remote_size);
> +               GOTO_ERR(handle, ALPM_ERR_RETRIEVE, cleanup);
> +       }
> +
> +       if(payload->trust_remote_name) {
> +               if(payload->content_disp_name) {
> +                       /* content-disposition header has a better name for our file */
> +                       free(payload->destfile_name);
> +                       payload->destfile_name = get_fullpath(localpath,
> +                               get_filename(payload->content_disp_name), "");
> +               } else {
> +                       const char *effective_filename = strrchr(effective_url, '/');
> +                       if(effective_filename && strlen(effective_filename) > 2) {
> +                               effective_filename++;
> +
> +                               /* if destfile was never set, we wrote to a tempfile. even if destfile is
> +                                * set, we may have followed some redirects and the effective url may
> +                                * have a better suggestion as to what to name our file. in either case,
> +                                * refactor destfile to this newly derived name. */
> +                               if(!payload->destfile_name || strcmp(effective_filename,
> +                                                       strrchr(payload->destfile_name, '/') + 1) != 0) {
> +                                       free(payload->destfile_name);
> +                                       payload->destfile_name = get_fullpath(localpath, effective_filename, "");
> +                               }
> +                       }
> +               }
> +       }
> +
> +       ret = 0;
> +
> +cleanup:
> +       /* disconnect relationships from the curl handle for things that might go out
> +        * of scope, but could still be touched on connection teardown. This really
> +        * only applies to FTP transfers. */
> +       curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
> +       curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, (char *)NULL);
> +
> +       if(payload->localf != NULL) {
> +               fclose(payload->localf);
> +               utimes_long(payload->tempfile_name, remote_time);
> +       }
> +
> +       if(ret == 0) {
> +               if(payload->destfile_name) {
> +                       if(rename(payload->tempfile_name, payload->destfile_name)) {
> +                               _alpm_log(handle, ALPM_LOG_ERROR, _("could not rename %s to %s (%s)\n"),
> +                                               payload->tempfile_name, payload->destfile_name, strerror(errno));
> +                               ret = -1;
> +                       }
> +               }
> +       }
> +
> +       if((ret == -1 || dload_interrupted) && payload->unlink_on_fail &&
> +                       payload->tempfile_name) {
> +               unlink(payload->tempfile_name);
> +       }
> +
> +       // TODO: report that the download has been completed
> +
> +       curl_multi_remove_handle(curlm, curl);
> +       curl_easy_cleanup(curl);
> +       payload->curl = NULL;
> +
> +       FREE(payload->fileurl);
> +       return ret;
> +}
> +
> +/* Returns 0 in case if a new download transaction has been successfully started
> + * Returns -1 if am error happened while starting a new download
> + */
> +static int curl_multi_add_payload(alpm_handle_t *handle, CURLM *curlm,
> +               struct dload_payload *payload, const char *localpath)
> +{
> +       size_t len;
> +       const char *server;
> +       CURL *curl = NULL;
> +       char hostname[HOSTNAME_SIZE];
> +
> +       ASSERT(payload->servers, RET_ERR(handle, ALPM_ERR_SERVER_NONE, -1));
> +       server = payload->servers->data;
> +
> +       curl = curl_easy_init();
> +       payload->curl = curl;
> +
> +       len = strlen(server) + strlen(payload->filepath) + 2;
> +       MALLOC(payload->fileurl, len, GOTO_ERR(handle, ALPM_ERR_MEMORY, cleanup));
> +       snprintf(payload->fileurl, len, "%s/%s", server, payload->filepath);
> +
> +       payload->tempfile_openmode = "wb";
> +       if(!payload->remote_name) {
> +               STRDUP(payload->remote_name, get_filename(payload->fileurl),
> +                       GOTO_ERR(handle, ALPM_ERR_MEMORY, cleanup));
> +       }
> +       if(curl_gethost(payload->fileurl, hostname, sizeof(hostname)) != 0) {
> +               _alpm_log(handle, ALPM_LOG_ERROR, _("url '%s' is invalid\n"), payload->fileurl);
> +               GOTO_ERR(handle, ALPM_ERR_SERVER_BAD_URL, cleanup);
> +       }
> +
> +       if(payload->remote_name && strlen(payload->remote_name) > 0) {
> +               payload->destfile_name = get_fullpath(localpath, payload->remote_name, "");
> +               payload->tempfile_name = get_fullpath(localpath, payload->remote_name, ".part");
> +               if(!payload->destfile_name || !payload->tempfile_name) {
> +                       goto cleanup;
> +               }
> +       } else {
> +               /* URL doesn't contain a filename, so make a tempfile. We can't support
> +                * resuming this kind of download; partial transfers will be destroyed */
> +               payload->unlink_on_fail = 1;
> +
> +               payload->localf = create_tempfile(payload, localpath);
> +               if(payload->localf == NULL) {
> +                       goto cleanup;
> +               }
> +       }
> +
> +       curl_set_handle_opts(payload, curl, payload->error_buffer);
> +
> +       if(payload->max_size == payload->initial_size) {
> +               /* .part file is complete */
> +               goto cleanup;
> +       }
> +
> +       if(payload->localf == NULL) {
> +               payload->localf = fopen(payload->tempfile_name, payload->tempfile_openmode);
> +               if(payload->localf == NULL) {
> +                       _alpm_log(handle, ALPM_LOG_ERROR,
> +                                       _("could not open file %s: %s\n"),
> +                                       payload->tempfile_name, strerror(errno));
> +                       GOTO_ERR(handle, ALPM_ERR_RETRIEVE, cleanup);
> +               }
> +       }
> +
> +       _alpm_log(handle, ALPM_LOG_DEBUG,
> +                       "opened tempfile for download: %s (%s)\n", payload->tempfile_name,
> +                       payload->tempfile_openmode);
> +
> +       curl_easy_setopt(curl, CURLOPT_WRITEDATA, payload->localf);
> +       curl_multi_add_handle(curlm, curl);
> +       return 0;
> +
> +cleanup:
> +       FREE(payload->fileurl);
> +       FREE(payload->tempfile_name);
> +       FREE(payload->destfile_name);
> +       FREE(payload->content_disp_name);
> +       curl_easy_cleanup(curl);
> +       return -1;
> +}
> +
>  static int curl_multi_download_internal(alpm_handle_t *handle,
>                 alpm_list_t *payloads /* struct dload_payload */,
>                 const char *localpath)
>  {
> -       (void)handle;
> -       (void)payloads;
> -       (void)localpath;
> -       return 0;
> +       int still_running = 0;
> +       int err = 0;
> +       int parallel_downloads = handle->parallel_downloads;
> +
> +       CURLM *curlm = handle->curlm;
> +       CURLMsg *msg;
> +
> +       while(still_running || payloads) {
> +               int msgs_left = -1;
> +
> +               for(; still_running < parallel_downloads && payloads; still_running++) {
> +                       struct dload_payload *payload = payloads->data;
> +
> +                       if(curl_multi_add_payload(handle, curlm, payloads->data, localpath) == 0) {
> +                               // TODO: report that download has started
> +                               payloads = payloads->next;
> +                       } else {
> +                               // the payload failed to start, do not start any new downloads just wait until
> +                               // active one complete.
> +                               _alpm_log(handle, ALPM_LOG_ERROR, "failed to setup a download payload for %s\n", payload->remote_name);
> +                               payloads = NULL;
> +                               err = -1;
> +                       }
> +               }
> +
> +               CURLMcode mc = curl_multi_perform(curlm, &still_running);
> +
> +               if(mc != CURLM_OK) {
> +                       _alpm_log(handle, ALPM_LOG_ERROR, "curl returned error %d from transfer\n", mc);
> +                       payloads = NULL;
> +                       err = -1;
> +               }
> +
> +               while((msg = curl_multi_info_read(curlm, &msgs_left))) {
> +                       if(msg->msg == CURLMSG_DONE) {
> +                               int done_code = curl_multi_handle_single_done(curlm, msg, localpath);
> +                               if(done_code == 2) {
> +                                       /* in case of a retry increase the counter of active requests
> +                                        * to avoid exiting the loop early
> +                                        */
> +                                       still_running++;
> +                               }
> +                       } else {
> +                               _alpm_log(handle, ALPM_LOG_ERROR, "curl curl_multi_info_read error %d\n", msg->msg);
> +                       }
> +               }
> +               if(still_running) {
> +                       curl_multi_wait(curlm, NULL, 0, 1000, NULL);
> +               }
> +       }
> +
> +       return err;
>  }
> +
>  #endif
>
>  /** Download a file given by a URL to a local directory.
> diff --git a/lib/libalpm/dload.h b/lib/libalpm/dload.h
> index e87b6a93..a40b51b7 100644
> --- a/lib/libalpm/dload.h
> +++ b/lib/libalpm/dload.h
> @@ -45,6 +45,8 @@ struct dload_payload {
>         int cb_initialized;
>  #ifdef HAVE_LIBCURL
>         CURL *curl;
> +       char error_buffer[CURL_ERROR_SIZE];
> +       FILE *localf; /* temp download file */
>  #endif
>  };
>
> --
> 2.25.1
>


More information about the pacman-dev mailing list