[pacman-dev] [PATCH 2/2] Parallelize integrity checks.
Dan McGee
dpmcgee at gmail.com
Fri Feb 25 11:20:52 EST 2011
On Mon, Feb 21, 2011 at 4:38 PM, Tavian Barnes <tavianator at gmail.com> wrote:
> Use the new _alpm_for_each_cpu() API to perform integrity checks in
> parallel. This speeds up integrity checks drastically on multi-core
> machines.
> ---
> lib/libalpm/sync.c | 186 +++++++++++++++++++++++++++++++++++++++-------------
> 1 files changed, 141 insertions(+), 45 deletions(-)
>
> diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
> index 859b8c9..52c87e1 100644
> --- a/lib/libalpm/sync.c
> +++ b/lib/libalpm/sync.c
> @@ -32,6 +32,7 @@
> #include <unistd.h>
> #include <time.h>
> #include <limits.h>
> +#include <pthread.h>
>
> /* libalpm */
> #include "sync.h"
> @@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char *filename,
> return(ret);
> }
>
> +typedef struct {
> + pmtrans_t *trans;
> + alpm_list_t *deltas;
> + alpm_list_t **data;
> + pthread_mutex_t *mutex;
> +} _alpm_delta_integrity_payload;
We should stop typedef-ing all of these damn helper structs and just
use named struct types.
> +
> +static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus)
> +{
> + _alpm_delta_integrity_payload *payload = ptr;
> + pmtrans_t *trans = payload->trans;
> + alpm_list_t *deltas = payload->deltas;
> + alpm_list_t **data = payload->data;
> + pthread_mutex_t *mutex = payload->mutex;
> + size_t numdeltas = alpm_list_count(deltas);
> +
> + alpm_list_t *i;
> + size_t count = 0, range = (numdeltas * thread) / numcpus;
> + for (i = deltas; count < range; i = i->next, count++);
> +
> + int errors = 0;
> + range = (numdeltas * (thread + 1)) / numcpus;
> + pthread_mutex_lock(mutex);
> + for(i = deltas; i; i = i->next) {
> + pmdelta_t *d = alpm_list_getdata(i);
> + const char *filename = alpm_delta_get_filename(d);
> + const char *md5sum = alpm_delta_get_md5sum(d);
> +
> + /* Calculate md5sums in parallel */
> + pthread_mutex_unlock(mutex);
> + int test = test_md5sum(trans, filename, md5sum);
> + pthread_mutex_lock(mutex);
What mutex is this? And why does it get unlocked for only this part,
but need to be locked at all elsewhere? Some clarity on the name would
be good.
> +
> + if(test != 0) {
> + errors++;
> + *data = alpm_list_add(*data, strdup(filename));
> + }
> + }
> + pthread_mutex_unlock(mutex);
> +
> + return errors;
> +}
> +
> +typedef struct {
> + pmtrans_t *trans;
> + alpm_list_t **data;
> + size_t numtargs;
> + size_t *current;
> + pthread_mutex_t *mutex;
> +} _alpm_integrity_payload;
> +
> +static int _alpm_check_integrity(void *ptr, int thread, int numcpus)
_alpm_check_pkg_integrity to match naming scheme?
> +{
> + _alpm_integrity_payload *payload = ptr;
> + pmtrans_t *trans = payload->trans;
> + alpm_list_t **data = payload->data;
> + size_t numtargs = payload->numtargs;
> + size_t *current = payload->current;
> + pthread_mutex_t *mutex = payload->mutex;
> +
> + alpm_list_t *i;
> + size_t count = 0, range = (numtargs * thread) / numcpus;
> + for (i = trans->add; count < range; i = i->next, count++);
> +
> + int errors = 0;
> + range = (numtargs * (thread + 1)) / numcpus;
> + pthread_mutex_lock(mutex);
> + for(; count < range; i = i->next, count++) {
> + pmpkg_t *spkg = i->data;
> + if(spkg->origin == PKG_FROM_FILE) {
> + continue; /* pkg_load() has been already called, this package is valid */
> + }
> +
> + const char *filename = alpm_pkg_get_filename(spkg);
> + const char *md5sum = alpm_pkg_get_md5sum(spkg);
> +
> + /* Calculate md5sums in parallel */
> + pthread_mutex_unlock(mutex);
> + int test = test_md5sum(trans, filename, md5sum);
> + pthread_mutex_lock(mutex);
> +
> + if(test != 0) {
> + errors++;
> + *data = alpm_list_add(*data, strdup(filename));
> + goto next;
> + }
> + /* load the package file and replace pkgcache entry with it in the target list */
> + /* TODO: alpm_pkg_get_db() will not work on this target anymore */
> + _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
> + char *filepath = _alpm_filecache_find(filename);
> + pmpkg_t *pkgfile;
> +
> + /* Load packages in parallel */
> + pthread_mutex_unlock(mutex);
> + int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
> + pthread_mutex_lock(mutex);
> +
> + if(loaded != 0) {
> + _alpm_pkg_free(pkgfile);
> + errors++;
> + *data = alpm_list_add(*data, strdup(filename));
> + FREE(filepath);
> + goto next;
> + }
> + FREE(filepath);
> + pkgfile->reason = spkg->reason; /* copy over install reason */
> + i->data = pkgfile;
> + _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
> +
> + next:
> + ++*current;
> + int percent = (*current * 100) / numtargs;
> + PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
> + numtargs, *current);
> + }
> + pthread_mutex_unlock(mutex);
> +
> + return errors;
> +}
> +
> int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
> {
> alpm_list_t *i, *j, *files = NULL;
> @@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
> /* if we have deltas to work with */
> if(handle->usedelta && deltas) {
> int ret = 0;
> - errors = 0;
> +
> /* Check integrity of deltas */
> EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL);
>
> - for(i = deltas; i; i = i->next) {
> - pmdelta_t *d = alpm_list_getdata(i);
> - const char *filename = alpm_delta_get_filename(d);
> - const char *md5sum = alpm_delta_get_md5sum(d);
> + static pthread_mutex_t delta_integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
> + _alpm_delta_integrity_payload payload = {
> + .trans = trans,
> + .deltas = deltas,
> + .data = data,
> + .mutex = &delta_integrity_mutex
> + };
> +
> + errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, &payload);
> + EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
>
> - if(test_md5sum(trans, filename, md5sum) != 0) {
> - errors++;
> - *data = alpm_list_add(*data, strdup(filename));
> - }
> - }
> if(errors) {
> pm_errno = PM_ERR_DLT_INVALID;
> goto error;
> }
> - EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
>
> /* Use the deltas to generate the packages */
> EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL);
> @@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
> numtargs = alpm_list_count(trans->add);
> EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
>
> - errors = 0;
> - for(i = trans->add; i; i = i->next, current++) {
> - pmpkg_t *spkg = i->data;
> - int percent = (current * 100) / numtargs;
> - if(spkg->origin == PKG_FROM_FILE) {
> - continue; /* pkg_load() has been already called, this package is valid */
> - }
> - PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
> - numtargs, current);
> -
> - const char *filename = alpm_pkg_get_filename(spkg);
> - const char *md5sum = alpm_pkg_get_md5sum(spkg);
> + static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
> + _alpm_integrity_payload payload = {
> + .trans = trans,
> + .data = data,
> + .numtargs = numtargs,
> + .current = ¤t,
> + .mutex = &integrity_mutex
> + };
> + errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload);
>
> - if(test_md5sum(trans, filename, md5sum) != 0) {
> - errors++;
> - *data = alpm_list_add(*data, strdup(filename));
> - continue;
> - }
> - /* load the package file and replace pkgcache entry with it in the target list */
> - /* TODO: alpm_pkg_get_db() will not work on this target anymore */
> - _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
> - char *filepath = _alpm_filecache_find(filename);
> - pmpkg_t *pkgfile;
> - if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
> - _alpm_pkg_free(pkgfile);
> - errors++;
> - *data = alpm_list_add(*data, strdup(filename));
> - FREE(filepath);
> - continue;
> - }
> - FREE(filepath);
> - pkgfile->reason = spkg->reason; /* copy over install reason */
> - i->data = pkgfile;
> - _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
> - }
> PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
> numtargs, current);
> EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);
> --
> 1.7.4.1
I would love it if this was a two-part patch- first refactor the two
functions out, then parallelize the calls to them.
-Dan
More information about the pacman-dev
mailing list