[pacman-dev] Parallelize integrity checks
The following patches add a new _alpm_for_each_cpu() internal API function, and use it to spread out the workload of integrity checking deltas and packages over the available CPUs. I didn't end up dividing the packages by size, because the logic gets uglier for what is probably marginal benefit. So each thread gets roughly the same number of packages to check. It currently uses sysconf(_SC_NPROCESSORS_ONLN) if available to count the number of CPUs. Support for detecting the number of CPUs on other platforms should probably be added by people who can test on other platforms, but the git and x264 source code are both good examples.
This function takes a callback which is called in a separate thread for every CPU core you have, to spread a parallel workload over the available cores. --- lib/libalpm/util.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/libalpm/util.h | 3 ++ 2 files changed, 56 insertions(+), 0 deletions(-) diff --git a/lib/libalpm/util.c b/lib/libalpm/util.c index 458f750..4e5cf85 100644 --- a/lib/libalpm/util.c +++ b/lib/libalpm/util.c @@ -39,6 +39,7 @@ #include <sys/stat.h> #include <sys/wait.h> #include <locale.h> /* setlocale */ +#include <pthread.h> /* libarchive */ #include <archive.h> @@ -938,6 +939,58 @@ long _alpm_parsedate(const char *line) return(atol(line)); } +typedef struct { + _alpm_for_each_cpu_callback callback; + void *ptr; + int thread, numcpus; + int ret; +} _alpm_thread_payload; + +static void *_alpm_thread_callback(void *ptr) +{ + _alpm_thread_payload *payload = ptr; + payload->ret = payload->callback(payload->ptr, payload->thread, payload->numcpus); + return NULL; +} + +static int _alpm_numcpus() +{ +#ifdef _SC_NPROCESSORS_ONLN + long numcpus = sysconf(_SC_NPROCESSORS_ONLN); + if(numcpus >= 1) { + return numcpus; + } +#endif + + return 1; +} + +int _alpm_for_each_cpu(_alpm_for_each_cpu_callback callback, void *ptr) +{ + int numcpus = _alpm_numcpus(); + + pthread_t threads[numcpus]; + _alpm_thread_payload payloads[numcpus]; + for(int i = 0; i < numcpus; i++) { + payloads[i].callback = callback; + payloads[i].ptr = ptr; + payloads[i].thread = i; + payloads[i].numcpus = numcpus; + + pthread_create(&threads[i], NULL, _alpm_thread_callback, &payloads[i]); + } + + int ret = 0; + for(int i = 0; i < numcpus; i++) { + pthread_join(threads[i], NULL); + if(payloads[i].ret != 0) { + ret = payloads[i].ret; + } + } + + return ret; +} + #ifndef HAVE_STRNDUP /* A quick and dirty implementation derived from glibc */ static size_t strnlen(const char *s, size_t max) diff --git a/lib/libalpm/util.h b/lib/libalpm/util.h index 015e9bf..72cd5d7 100644 --- a/lib/libalpm/util.h +++ b/lib/libalpm/util.h @@ -98,6 +98,9 @@ int _alpm_splitname(const char *target, pmpkg_t *pkg); unsigned long _alpm_hash_sdbm(const char *str); long _alpm_parsedate(const char *line); +typedef int (*_alpm_for_each_cpu_callback)(void *ptr, int thread, int numcpus); +int _alpm_for_each_cpu(_alpm_for_each_cpu_callback callback, void *ptr); + #ifndef HAVE_STRSEP char *strsep(char **, const char *); #endif -- 1.7.4.1
Use the new _alpm_for_each_cpu() API to perform integrity checks in parallel. This speeds up integrity checks drastically on multi-core machines. --- lib/libalpm/sync.c | 186 +++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 141 insertions(+), 45 deletions(-) diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c index 859b8c9..52c87e1 100644 --- a/lib/libalpm/sync.c +++ b/lib/libalpm/sync.c @@ -32,6 +32,7 @@ #include <unistd.h> #include <time.h> #include <limits.h> +#include <pthread.h> /* libalpm */ #include "sync.h" @@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char *filename, return(ret); } +typedef struct { + pmtrans_t *trans; + alpm_list_t *deltas; + alpm_list_t **data; + pthread_mutex_t *mutex; +} _alpm_delta_integrity_payload; + +static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus) +{ + _alpm_delta_integrity_payload *payload = ptr; + pmtrans_t *trans = payload->trans; + alpm_list_t *deltas = payload->deltas; + alpm_list_t **data = payload->data; + pthread_mutex_t *mutex = payload->mutex; + size_t numdeltas = alpm_list_count(deltas); + + alpm_list_t *i; + size_t count = 0, range = (numdeltas * thread) / numcpus; + for (i = deltas; count < range; i = i->next, count++); + + int errors = 0; + range = (numdeltas * (thread + 1)) / numcpus; + pthread_mutex_lock(mutex); + for(i = deltas; i; i = i->next) { + pmdelta_t *d = alpm_list_getdata(i); + const char *filename = alpm_delta_get_filename(d); + const char *md5sum = alpm_delta_get_md5sum(d); + + /* Calculate md5sums in parallel */ + pthread_mutex_unlock(mutex); + int test = test_md5sum(trans, filename, md5sum); + pthread_mutex_lock(mutex); + + if(test != 0) { + errors++; + *data = alpm_list_add(*data, strdup(filename)); + } + } + pthread_mutex_unlock(mutex); + + return errors; +} + +typedef struct { + pmtrans_t *trans; + alpm_list_t **data; + size_t numtargs; + size_t *current; + pthread_mutex_t *mutex; +} _alpm_integrity_payload; + +static int _alpm_check_integrity(void *ptr, int thread, int numcpus) +{ + _alpm_integrity_payload *payload = ptr; + pmtrans_t *trans = payload->trans; + alpm_list_t **data = payload->data; + size_t numtargs = payload->numtargs; + size_t *current = payload->current; + pthread_mutex_t *mutex = payload->mutex; + + alpm_list_t *i; + size_t count = 0, range = (numtargs * thread) / numcpus; + for (i = trans->add; count < range; i = i->next, count++); + + int errors = 0; + range = (numtargs * (thread + 1)) / numcpus; + pthread_mutex_lock(mutex); + for(; count < range; i = i->next, count++) { + pmpkg_t *spkg = i->data; + if(spkg->origin == PKG_FROM_FILE) { + continue; /* pkg_load() has been already called, this package is valid */ + } + + const char *filename = alpm_pkg_get_filename(spkg); + const char *md5sum = alpm_pkg_get_md5sum(spkg); + + /* Calculate md5sums in parallel */ + pthread_mutex_unlock(mutex); + int test = test_md5sum(trans, filename, md5sum); + pthread_mutex_lock(mutex); + + if(test != 0) { + errors++; + *data = alpm_list_add(*data, strdup(filename)); + goto next; + } + /* load the package file and replace pkgcache entry with it in the target list */ + /* TODO: alpm_pkg_get_db() will not work on this target anymore */ + _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); + char *filepath = _alpm_filecache_find(filename); + pmpkg_t *pkgfile; + + /* Load packages in parallel */ + pthread_mutex_unlock(mutex); + int loaded = alpm_pkg_load(filepath, 1, &pkgfile); + pthread_mutex_lock(mutex); + + if(loaded != 0) { + _alpm_pkg_free(pkgfile); + errors++; + *data = alpm_list_add(*data, strdup(filename)); + FREE(filepath); + goto next; + } + FREE(filepath); + pkgfile->reason = spkg->reason; /* copy over install reason */ + i->data = pkgfile; + _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ + + next: + ++*current; + int percent = (*current * 100) / numtargs; + PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, + numtargs, *current); + } + pthread_mutex_unlock(mutex); + + return errors; +} + int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) { alpm_list_t *i, *j, *files = NULL; @@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) /* if we have deltas to work with */ if(handle->usedelta && deltas) { int ret = 0; - errors = 0; + /* Check integrity of deltas */ EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL); - for(i = deltas; i; i = i->next) { - pmdelta_t *d = alpm_list_getdata(i); - const char *filename = alpm_delta_get_filename(d); - const char *md5sum = alpm_delta_get_md5sum(d); + static pthread_mutex_t delta_integrity_mutex = PTHREAD_MUTEX_INITIALIZER; + _alpm_delta_integrity_payload payload = { + .trans = trans, + .deltas = deltas, + .data = data, + .mutex = &delta_integrity_mutex + }; + + errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, &payload); + EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL); - if(test_md5sum(trans, filename, md5sum) != 0) { - errors++; - *data = alpm_list_add(*data, strdup(filename)); - } - } if(errors) { pm_errno = PM_ERR_DLT_INVALID; goto error; } - EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL); /* Use the deltas to generate the packages */ EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL); @@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) numtargs = alpm_list_count(trans->add); EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL); - errors = 0; - for(i = trans->add; i; i = i->next, current++) { - pmpkg_t *spkg = i->data; - int percent = (current * 100) / numtargs; - if(spkg->origin == PKG_FROM_FILE) { - continue; /* pkg_load() has been already called, this package is valid */ - } - PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, - numtargs, current); - - const char *filename = alpm_pkg_get_filename(spkg); - const char *md5sum = alpm_pkg_get_md5sum(spkg); + static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER; + _alpm_integrity_payload payload = { + .trans = trans, + .data = data, + .numtargs = numtargs, + .current = ¤t, + .mutex = &integrity_mutex + }; + errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload); - if(test_md5sum(trans, filename, md5sum) != 0) { - errors++; - *data = alpm_list_add(*data, strdup(filename)); - continue; - } - /* load the package file and replace pkgcache entry with it in the target list */ - /* TODO: alpm_pkg_get_db() will not work on this target anymore */ - _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); - char *filepath = _alpm_filecache_find(filename); - pmpkg_t *pkgfile; - if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) { - _alpm_pkg_free(pkgfile); - errors++; - *data = alpm_list_add(*data, strdup(filename)); - FREE(filepath); - continue; - } - FREE(filepath); - pkgfile->reason = spkg->reason; /* copy over install reason */ - i->data = pkgfile; - _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ - } PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100, numtargs, current); EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL); -- 1.7.4.1
On 02/21/2011 11:38 PM, Tavian Barnes wrote:
Use the new _alpm_for_each_cpu() API to perform integrity checks in parallel. This speeds up integrity checks drastically on multi-core machines. ---
Have you thought about / looked at doing integrity checks in the background as individual packages finish downloading?
On 21 February 2011 18:10, Jakob Gruber <jakob.gruber@gmail.com> wrote:
On 02/21/2011 11:38 PM, Tavian Barnes wrote:
Use the new _alpm_for_each_cpu() API to perform integrity checks in parallel. This speeds up integrity checks drastically on multi-core machines. ---
Have you thought about / looked at doing integrity checks in the background as individual packages finish downloading?
No, but now that I think about it that's obviously a good idea. But integrity checks may still need to happen where they do now, for example if the packages were previously downloaded and are now being installed. Although there's no reason the integrity checks couldn't happen in the background while the we're waiting for the user to confirm the installation. -- Tavian Barnes
On Mon, Feb 21, 2011 at 6:30 PM, Tavian Barnes <tavianator@tavianator.com> wrote:
On 21 February 2011 18:10, Jakob Gruber <jakob.gruber@gmail.com> wrote:
On 02/21/2011 11:38 PM, Tavian Barnes wrote:
Use the new _alpm_for_each_cpu() API to perform integrity checks in parallel. This speeds up integrity checks drastically on multi-core machines. ---
Have you thought about / looked at doing integrity checks in the background as individual packages finish downloading?
No, but now that I think about it that's obviously a good idea. But integrity checks may still need to happen where they do now, for example if the packages were previously downloaded and are now being installed. Although there's no reason the integrity checks couldn't happen in the background while the we're waiting for the user to confirm the installation.
-1 from me. This just isn't worth the nightmare it will end up probably being. -Dan
On Mon, Feb 21, 2011 at 4:38 PM, Tavian Barnes <tavianator@gmail.com> wrote:
Use the new _alpm_for_each_cpu() API to perform integrity checks in parallel. This speeds up integrity checks drastically on multi-core machines. --- lib/libalpm/sync.c | 186 +++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 141 insertions(+), 45 deletions(-)
diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c index 859b8c9..52c87e1 100644 --- a/lib/libalpm/sync.c +++ b/lib/libalpm/sync.c @@ -32,6 +32,7 @@ #include <unistd.h> #include <time.h> #include <limits.h> +#include <pthread.h>
/* libalpm */ #include "sync.h" @@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char *filename, return(ret); }
+typedef struct { + pmtrans_t *trans; + alpm_list_t *deltas; + alpm_list_t **data; + pthread_mutex_t *mutex; +} _alpm_delta_integrity_payload; We should stop typedef-ing all of these damn helper structs and just use named struct types.
+ +static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus) +{ + _alpm_delta_integrity_payload *payload = ptr; + pmtrans_t *trans = payload->trans; + alpm_list_t *deltas = payload->deltas; + alpm_list_t **data = payload->data; + pthread_mutex_t *mutex = payload->mutex; + size_t numdeltas = alpm_list_count(deltas); + + alpm_list_t *i; + size_t count = 0, range = (numdeltas * thread) / numcpus; + for (i = deltas; count < range; i = i->next, count++); + + int errors = 0; + range = (numdeltas * (thread + 1)) / numcpus; + pthread_mutex_lock(mutex); + for(i = deltas; i; i = i->next) { + pmdelta_t *d = alpm_list_getdata(i); + const char *filename = alpm_delta_get_filename(d); + const char *md5sum = alpm_delta_get_md5sum(d); + + /* Calculate md5sums in parallel */ + pthread_mutex_unlock(mutex); + int test = test_md5sum(trans, filename, md5sum); + pthread_mutex_lock(mutex); What mutex is this? And why does it get unlocked for only this part, but need to be locked at all elsewhere? Some clarity on the name would be good. + + if(test != 0) { + errors++; + *data = alpm_list_add(*data, strdup(filename)); + } + } + pthread_mutex_unlock(mutex); + + return errors; +} + +typedef struct { + pmtrans_t *trans; + alpm_list_t **data; + size_t numtargs; + size_t *current; + pthread_mutex_t *mutex; +} _alpm_integrity_payload; + +static int _alpm_check_integrity(void *ptr, int thread, int numcpus) _alpm_check_pkg_integrity to match naming scheme?
+{ + _alpm_integrity_payload *payload = ptr; + pmtrans_t *trans = payload->trans; + alpm_list_t **data = payload->data; + size_t numtargs = payload->numtargs; + size_t *current = payload->current; + pthread_mutex_t *mutex = payload->mutex; + + alpm_list_t *i; + size_t count = 0, range = (numtargs * thread) / numcpus; + for (i = trans->add; count < range; i = i->next, count++); + + int errors = 0; + range = (numtargs * (thread + 1)) / numcpus; + pthread_mutex_lock(mutex); + for(; count < range; i = i->next, count++) { + pmpkg_t *spkg = i->data; + if(spkg->origin == PKG_FROM_FILE) { + continue; /* pkg_load() has been already called, this package is valid */ + } + + const char *filename = alpm_pkg_get_filename(spkg); + const char *md5sum = alpm_pkg_get_md5sum(spkg); + + /* Calculate md5sums in parallel */ + pthread_mutex_unlock(mutex); + int test = test_md5sum(trans, filename, md5sum); + pthread_mutex_lock(mutex); + + if(test != 0) { + errors++; + *data = alpm_list_add(*data, strdup(filename)); + goto next; + } + /* load the package file and replace pkgcache entry with it in the target list */ + /* TODO: alpm_pkg_get_db() will not work on this target anymore */ + _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); + char *filepath = _alpm_filecache_find(filename); + pmpkg_t *pkgfile; + + /* Load packages in parallel */ + pthread_mutex_unlock(mutex); + int loaded = alpm_pkg_load(filepath, 1, &pkgfile); + pthread_mutex_lock(mutex); + + if(loaded != 0) { + _alpm_pkg_free(pkgfile); + errors++; + *data = alpm_list_add(*data, strdup(filename)); + FREE(filepath); + goto next; + } + FREE(filepath); + pkgfile->reason = spkg->reason; /* copy over install reason */ + i->data = pkgfile; + _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ + + next: + ++*current; + int percent = (*current * 100) / numtargs; + PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, + numtargs, *current); + } + pthread_mutex_unlock(mutex); + + return errors; +} + int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) { alpm_list_t *i, *j, *files = NULL; @@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) /* if we have deltas to work with */ if(handle->usedelta && deltas) { int ret = 0; - errors = 0; + /* Check integrity of deltas */ EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL);
- for(i = deltas; i; i = i->next) { - pmdelta_t *d = alpm_list_getdata(i); - const char *filename = alpm_delta_get_filename(d); - const char *md5sum = alpm_delta_get_md5sum(d); + static pthread_mutex_t delta_integrity_mutex = PTHREAD_MUTEX_INITIALIZER; + _alpm_delta_integrity_payload payload = { + .trans = trans, + .deltas = deltas, + .data = data, + .mutex = &delta_integrity_mutex + }; + + errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, &payload); + EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
- if(test_md5sum(trans, filename, md5sum) != 0) { - errors++; - *data = alpm_list_add(*data, strdup(filename)); - } - } if(errors) { pm_errno = PM_ERR_DLT_INVALID; goto error; } - EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
/* Use the deltas to generate the packages */ EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL); @@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) numtargs = alpm_list_count(trans->add); EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
- errors = 0; - for(i = trans->add; i; i = i->next, current++) { - pmpkg_t *spkg = i->data; - int percent = (current * 100) / numtargs; - if(spkg->origin == PKG_FROM_FILE) { - continue; /* pkg_load() has been already called, this package is valid */ - } - PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, - numtargs, current); - - const char *filename = alpm_pkg_get_filename(spkg); - const char *md5sum = alpm_pkg_get_md5sum(spkg); + static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER; + _alpm_integrity_payload payload = { + .trans = trans, + .data = data, + .numtargs = numtargs, + .current = ¤t, + .mutex = &integrity_mutex + }; + errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload);
- if(test_md5sum(trans, filename, md5sum) != 0) { - errors++; - *data = alpm_list_add(*data, strdup(filename)); - continue; - } - /* load the package file and replace pkgcache entry with it in the target list */ - /* TODO: alpm_pkg_get_db() will not work on this target anymore */ - _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); - char *filepath = _alpm_filecache_find(filename); - pmpkg_t *pkgfile; - if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) { - _alpm_pkg_free(pkgfile); - errors++; - *data = alpm_list_add(*data, strdup(filename)); - FREE(filepath); - continue; - } - FREE(filepath); - pkgfile->reason = spkg->reason; /* copy over install reason */ - i->data = pkgfile; - _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ - } PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100, numtargs, current); EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL); -- 1.7.4.1
I would love it if this was a two-part patch- first refactor the two functions out, then parallelize the calls to them. -Dan
participants (4)
-
Dan McGee
-
Jakob Gruber
-
Tavian Barnes
-
Tavian Barnes