[pacman-dev] [PATCH 2/2] Parallelize integrity checks.

Dan McGee dpmcgee at gmail.com
Fri Feb 25 11:20:52 EST 2011


On Mon, Feb 21, 2011 at 4:38 PM, Tavian Barnes <tavianator at gmail.com> wrote:
> Use the new _alpm_for_each_cpu() API to perform integrity checks in
> parallel.  This speeds up integrity checks drastically on multi-core
> machines.
> ---
>  lib/libalpm/sync.c |  186 +++++++++++++++++++++++++++++++++++++++-------------
>  1 files changed, 141 insertions(+), 45 deletions(-)
>
> diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
> index 859b8c9..52c87e1 100644
> --- a/lib/libalpm/sync.c
> +++ b/lib/libalpm/sync.c
> @@ -32,6 +32,7 @@
>  #include <unistd.h>
>  #include <time.h>
>  #include <limits.h>
> +#include <pthread.h>
>
>  /* libalpm */
>  #include "sync.h"
> @@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char *filename,
>        return(ret);
>  }
>
> +typedef struct {
> +       pmtrans_t *trans;
> +       alpm_list_t *deltas;
> +       alpm_list_t **data;
> +       pthread_mutex_t *mutex;
> +} _alpm_delta_integrity_payload;
We should stop typedef-ing all of these damn helper structs and just
use named struct types.

> +
> +static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus)
> +{
> +       _alpm_delta_integrity_payload *payload = ptr;
> +       pmtrans_t *trans = payload->trans;
> +       alpm_list_t *deltas = payload->deltas;
> +       alpm_list_t **data = payload->data;
> +       pthread_mutex_t *mutex = payload->mutex;
> +       size_t numdeltas = alpm_list_count(deltas);
> +
> +       alpm_list_t *i;
> +       size_t count = 0, range = (numdeltas * thread) / numcpus;
> +       for (i = deltas; count < range; i = i->next, count++);
> +
> +       int errors = 0;
> +       range = (numdeltas * (thread + 1)) / numcpus;
> +       pthread_mutex_lock(mutex);
> +       for(i = deltas; i; i = i->next) {
> +               pmdelta_t *d = alpm_list_getdata(i);
> +               const char *filename = alpm_delta_get_filename(d);
> +               const char *md5sum = alpm_delta_get_md5sum(d);
> +
> +               /* Calculate md5sums in parallel */
> +               pthread_mutex_unlock(mutex);
> +               int test = test_md5sum(trans, filename, md5sum);
> +               pthread_mutex_lock(mutex);
What mutex is this? And why does it get unlocked for only this part,
but need to be locked at all elsewhere? Some clarity on the name would
be good.
> +
> +               if(test != 0) {
> +                       errors++;
> +                       *data = alpm_list_add(*data, strdup(filename));
> +               }
> +       }
> +       pthread_mutex_unlock(mutex);
> +
> +       return errors;
> +}
> +
> +typedef struct {
> +       pmtrans_t *trans;
> +       alpm_list_t **data;
> +       size_t numtargs;
> +       size_t *current;
> +       pthread_mutex_t *mutex;
> +} _alpm_integrity_payload;
> +
> +static int _alpm_check_integrity(void *ptr, int thread, int numcpus)
_alpm_check_pkg_integrity to match naming scheme?

> +{
> +       _alpm_integrity_payload *payload = ptr;
> +       pmtrans_t *trans = payload->trans;
> +       alpm_list_t **data = payload->data;
> +       size_t numtargs = payload->numtargs;
> +       size_t *current = payload->current;
> +       pthread_mutex_t *mutex = payload->mutex;
> +
> +       alpm_list_t *i;
> +       size_t count = 0, range = (numtargs * thread) / numcpus;
> +       for (i = trans->add; count < range; i = i->next, count++);
> +
> +       int errors = 0;
> +       range = (numtargs * (thread + 1)) / numcpus;
> +       pthread_mutex_lock(mutex);
> +       for(; count < range; i = i->next, count++) {
> +               pmpkg_t *spkg = i->data;
> +               if(spkg->origin == PKG_FROM_FILE) {
> +                       continue; /* pkg_load() has been already called, this package is valid */
> +               }
> +
> +               const char *filename = alpm_pkg_get_filename(spkg);
> +               const char *md5sum = alpm_pkg_get_md5sum(spkg);
> +
> +               /* Calculate md5sums in parallel */
> +               pthread_mutex_unlock(mutex);
> +               int test = test_md5sum(trans, filename, md5sum);
> +               pthread_mutex_lock(mutex);
> +
> +               if(test != 0) {
> +                       errors++;
> +                       *data = alpm_list_add(*data, strdup(filename));
> +                       goto next;
> +               }
> +               /* load the package file and replace pkgcache entry with it in the target list */
> +               /* TODO: alpm_pkg_get_db() will not work on this target anymore */
> +               _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
> +               char *filepath = _alpm_filecache_find(filename);
> +               pmpkg_t *pkgfile;
> +
> +               /* Load packages in parallel */
> +               pthread_mutex_unlock(mutex);
> +               int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
> +               pthread_mutex_lock(mutex);
> +
> +               if(loaded != 0) {
> +                       _alpm_pkg_free(pkgfile);
> +                       errors++;
> +                       *data = alpm_list_add(*data, strdup(filename));
> +                       FREE(filepath);
> +                       goto next;
> +               }
> +               FREE(filepath);
> +               pkgfile->reason = spkg->reason; /* copy over install reason */
> +               i->data = pkgfile;
> +               _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
> +
> +       next:
> +               ++*current;
> +               int percent = (*current * 100) / numtargs;
> +               PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
> +                               numtargs, *current);
> +       }
> +       pthread_mutex_unlock(mutex);
> +
> +       return errors;
> +}
> +
>  int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
>  {
>        alpm_list_t *i, *j, *files = NULL;
> @@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
>        /* if we have deltas to work with */
>        if(handle->usedelta && deltas) {
>                int ret = 0;
> -               errors = 0;
> +
>                /* Check integrity of deltas */
>                EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL);
>
> -               for(i = deltas; i; i = i->next) {
> -                       pmdelta_t *d = alpm_list_getdata(i);
> -                       const char *filename = alpm_delta_get_filename(d);
> -                       const char *md5sum = alpm_delta_get_md5sum(d);
> +               static pthread_mutex_t delta_integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
> +               _alpm_delta_integrity_payload payload = {
> +                       .trans  = trans,
> +                       .deltas = deltas,
> +                       .data   = data,
> +                       .mutex  = &delta_integrity_mutex
> +               };
> +
> +               errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, &payload);
> +               EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
>
> -                       if(test_md5sum(trans, filename, md5sum) != 0) {
> -                               errors++;
> -                               *data = alpm_list_add(*data, strdup(filename));
> -                       }
> -               }
>                if(errors) {
>                        pm_errno = PM_ERR_DLT_INVALID;
>                        goto error;
>                }
> -               EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
>
>                /* Use the deltas to generate the packages */
>                EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL);
> @@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
>        numtargs = alpm_list_count(trans->add);
>        EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
>
> -       errors = 0;
> -       for(i = trans->add; i; i = i->next, current++) {
> -               pmpkg_t *spkg = i->data;
> -               int percent = (current * 100) / numtargs;
> -               if(spkg->origin == PKG_FROM_FILE) {
> -                       continue; /* pkg_load() has been already called, this package is valid */
> -               }
> -               PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
> -                               numtargs, current);
> -
> -               const char *filename = alpm_pkg_get_filename(spkg);
> -               const char *md5sum = alpm_pkg_get_md5sum(spkg);
> +       static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
> +       _alpm_integrity_payload payload = {
> +               .trans    = trans,
> +               .data     = data,
> +               .numtargs = numtargs,
> +               .current  = &current,
> +               .mutex    = &integrity_mutex
> +       };
> +       errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload);
>
> -               if(test_md5sum(trans, filename, md5sum) != 0) {
> -                       errors++;
> -                       *data = alpm_list_add(*data, strdup(filename));
> -                       continue;
> -               }
> -               /* load the package file and replace pkgcache entry with it in the target list */
> -               /* TODO: alpm_pkg_get_db() will not work on this target anymore */
> -               _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
> -               char *filepath = _alpm_filecache_find(filename);
> -               pmpkg_t *pkgfile;
> -               if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
> -                       _alpm_pkg_free(pkgfile);
> -                       errors++;
> -                       *data = alpm_list_add(*data, strdup(filename));
> -                       FREE(filepath);
> -                       continue;
> -               }
> -               FREE(filepath);
> -               pkgfile->reason = spkg->reason; /* copy over install reason */
> -               i->data = pkgfile;
> -               _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
> -       }
>        PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
>                        numtargs, current);
>        EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);
> --
> 1.7.4.1

I would love it if this was a two-part patch- first refactor the two
functions out, then parallelize the calls to them.

-Dan


More information about the pacman-dev mailing list