On Mon, Feb 21, 2011 at 4:38 PM, Tavian Barnes <tavianator@gmail.com> wrote:
Use the new _alpm_for_each_cpu() API to perform integrity checks in parallel. This speeds up integrity checks drastically on multi-core machines. --- lib/libalpm/sync.c | 186 +++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 141 insertions(+), 45 deletions(-)
diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c index 859b8c9..52c87e1 100644 --- a/lib/libalpm/sync.c +++ b/lib/libalpm/sync.c @@ -32,6 +32,7 @@ #include <unistd.h> #include <time.h> #include <limits.h> +#include <pthread.h>
/* libalpm */ #include "sync.h" @@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char *filename, return(ret); }
+typedef struct { + pmtrans_t *trans; + alpm_list_t *deltas; + alpm_list_t **data; + pthread_mutex_t *mutex; +} _alpm_delta_integrity_payload; We should stop typedef-ing all of these damn helper structs and just use named struct types.
+ +static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus) +{ + _alpm_delta_integrity_payload *payload = ptr; + pmtrans_t *trans = payload->trans; + alpm_list_t *deltas = payload->deltas; + alpm_list_t **data = payload->data; + pthread_mutex_t *mutex = payload->mutex; + size_t numdeltas = alpm_list_count(deltas); + + alpm_list_t *i; + size_t count = 0, range = (numdeltas * thread) / numcpus; + for (i = deltas; count < range; i = i->next, count++); + + int errors = 0; + range = (numdeltas * (thread + 1)) / numcpus; + pthread_mutex_lock(mutex); + for(i = deltas; i; i = i->next) { + pmdelta_t *d = alpm_list_getdata(i); + const char *filename = alpm_delta_get_filename(d); + const char *md5sum = alpm_delta_get_md5sum(d); + + /* Calculate md5sums in parallel */ + pthread_mutex_unlock(mutex); + int test = test_md5sum(trans, filename, md5sum); + pthread_mutex_lock(mutex); What mutex is this? And why does it get unlocked for only this part, but need to be locked at all elsewhere? Some clarity on the name would be good. + + if(test != 0) { + errors++; + *data = alpm_list_add(*data, strdup(filename)); + } + } + pthread_mutex_unlock(mutex); + + return errors; +} + +typedef struct { + pmtrans_t *trans; + alpm_list_t **data; + size_t numtargs; + size_t *current; + pthread_mutex_t *mutex; +} _alpm_integrity_payload; + +static int _alpm_check_integrity(void *ptr, int thread, int numcpus) _alpm_check_pkg_integrity to match naming scheme?
+{ + _alpm_integrity_payload *payload = ptr; + pmtrans_t *trans = payload->trans; + alpm_list_t **data = payload->data; + size_t numtargs = payload->numtargs; + size_t *current = payload->current; + pthread_mutex_t *mutex = payload->mutex; + + alpm_list_t *i; + size_t count = 0, range = (numtargs * thread) / numcpus; + for (i = trans->add; count < range; i = i->next, count++); + + int errors = 0; + range = (numtargs * (thread + 1)) / numcpus; + pthread_mutex_lock(mutex); + for(; count < range; i = i->next, count++) { + pmpkg_t *spkg = i->data; + if(spkg->origin == PKG_FROM_FILE) { + continue; /* pkg_load() has been already called, this package is valid */ + } + + const char *filename = alpm_pkg_get_filename(spkg); + const char *md5sum = alpm_pkg_get_md5sum(spkg); + + /* Calculate md5sums in parallel */ + pthread_mutex_unlock(mutex); + int test = test_md5sum(trans, filename, md5sum); + pthread_mutex_lock(mutex); + + if(test != 0) { + errors++; + *data = alpm_list_add(*data, strdup(filename)); + goto next; + } + /* load the package file and replace pkgcache entry with it in the target list */ + /* TODO: alpm_pkg_get_db() will not work on this target anymore */ + _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); + char *filepath = _alpm_filecache_find(filename); + pmpkg_t *pkgfile; + + /* Load packages in parallel */ + pthread_mutex_unlock(mutex); + int loaded = alpm_pkg_load(filepath, 1, &pkgfile); + pthread_mutex_lock(mutex); + + if(loaded != 0) { + _alpm_pkg_free(pkgfile); + errors++; + *data = alpm_list_add(*data, strdup(filename)); + FREE(filepath); + goto next; + } + FREE(filepath); + pkgfile->reason = spkg->reason; /* copy over install reason */ + i->data = pkgfile; + _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ + + next: + ++*current; + int percent = (*current * 100) / numtargs; + PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, + numtargs, *current); + } + pthread_mutex_unlock(mutex); + + return errors; +} + int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) { alpm_list_t *i, *j, *files = NULL; @@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) /* if we have deltas to work with */ if(handle->usedelta && deltas) { int ret = 0; - errors = 0; + /* Check integrity of deltas */ EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL);
- for(i = deltas; i; i = i->next) { - pmdelta_t *d = alpm_list_getdata(i); - const char *filename = alpm_delta_get_filename(d); - const char *md5sum = alpm_delta_get_md5sum(d); + static pthread_mutex_t delta_integrity_mutex = PTHREAD_MUTEX_INITIALIZER; + _alpm_delta_integrity_payload payload = { + .trans = trans, + .deltas = deltas, + .data = data, + .mutex = &delta_integrity_mutex + }; + + errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, &payload); + EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
- if(test_md5sum(trans, filename, md5sum) != 0) { - errors++; - *data = alpm_list_add(*data, strdup(filename)); - } - } if(errors) { pm_errno = PM_ERR_DLT_INVALID; goto error; } - EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
/* Use the deltas to generate the packages */ EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL); @@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) numtargs = alpm_list_count(trans->add); EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
- errors = 0; - for(i = trans->add; i; i = i->next, current++) { - pmpkg_t *spkg = i->data; - int percent = (current * 100) / numtargs; - if(spkg->origin == PKG_FROM_FILE) { - continue; /* pkg_load() has been already called, this package is valid */ - } - PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, - numtargs, current); - - const char *filename = alpm_pkg_get_filename(spkg); - const char *md5sum = alpm_pkg_get_md5sum(spkg); + static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER; + _alpm_integrity_payload payload = { + .trans = trans, + .data = data, + .numtargs = numtargs, + .current = ¤t, + .mutex = &integrity_mutex + }; + errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload);
- if(test_md5sum(trans, filename, md5sum) != 0) { - errors++; - *data = alpm_list_add(*data, strdup(filename)); - continue; - } - /* load the package file and replace pkgcache entry with it in the target list */ - /* TODO: alpm_pkg_get_db() will not work on this target anymore */ - _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); - char *filepath = _alpm_filecache_find(filename); - pmpkg_t *pkgfile; - if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) { - _alpm_pkg_free(pkgfile); - errors++; - *data = alpm_list_add(*data, strdup(filename)); - FREE(filepath); - continue; - } - FREE(filepath); - pkgfile->reason = spkg->reason; /* copy over install reason */ - i->data = pkgfile; - _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ - } PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100, numtargs, current); EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL); -- 1.7.4.1
I would love it if this was a two-part patch- first refactor the two functions out, then parallelize the calls to them. -Dan