On 19 February 2011 06:28, Nezmer <git@nezmer.info> wrote:
Actually, The sysconf() method works at least in FreeBSD and the man page says the sysconf interface is defined by POSIX.1
The sysconf() interface is specified by POSIX.1, but _SC_NPROCESSORS_ONLN is a non-standard extension.
You can look at how x264 guys implemented this. Check out x264_cpu_num_processors() in common/cpu.c in their source tree.
Seems to be basically a more platform-complete version of what I'm doing here [1]. But yeah, we'd need something like that if pacman is supposed to run on everything unix-based. I don't imagine we care much about Windows compatibility :)
I'm not sure hard-coding supported platforms through ifdefs would be accepted in pacman/libalpm.
There's code that does this already in lib/libalpm/diskspace.c, for example. It's not like this would limit the supported platforms either; we can just pretend there's only 1 core when we don't know how to count them. Anyway, I've made a quick and dirty patch as a proof of concept. It drops the time spent doing integrity checks from 9 seconds to 3 seconds on an 88MB update. The improvements I plan to maker are to hide some of the threading behind an _alpm_for_each_cpu() API, use it for delta integrity checks too, and divide the packages between the cores evenly in terms of package size rather than package count. And of course add the missing error checks. diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c index 859b8c9..595abd9 100644 --- a/lib/libalpm/sync.c +++ b/lib/libalpm/sync.c @@ -32,6 +32,7 @@ #include <unistd.h> #include <time.h> #include <limits.h> +#include <pthread.h> /* libalpm */ #include "sync.h" @@ -687,6 +688,79 @@ static int test_md5sum(pmtrans_t *trans, const char *filename, return(ret); } +/* Thread payload for checking package integrity */ +typedef struct { + size_t thread, numcpus, numtargs; + volatile size_t *current; + pmtrans_t *trans; + int *errors; + alpm_list_t **data; + + pthread_mutex_t *mutex; +} _alpm_integrity_payload; + +static void *_alpm_integrity_check_thread(void *ptr) +{ + _alpm_integrity_payload *payload = ptr; + pmtrans_t *trans = payload->trans; + + alpm_list_t *i; + size_t count = 0, range = (payload->numtargs * payload->thread) / payload->numcpus; + for(i = trans->add; count < range; i = i->next, count++); + + range = (payload->numtargs * (payload->thread + 1)) / payload->numcpus; + pthread_mutex_lock(payload->mutex); + for(; count < range; i = i->next, count++) { + pmpkg_t *spkg = i->data; + if(spkg->origin == PKG_FROM_FILE) { + continue; /* pkg_load() has been already called, this package is valid */ + } + + const char *filename = alpm_pkg_get_filename(spkg); + const char *md5sum = alpm_pkg_get_md5sum(spkg); + + pthread_mutex_unlock(payload->mutex); + int test = test_md5sum(trans, filename, md5sum); + pthread_mutex_lock(payload->mutex); + + if (test != 0) { + (*payload->errors)++; + *payload->data = alpm_list_add(*payload->data, strdup(filename)); + goto next; + } + /* load the package file and replace pkgcache entry with it in the target list */ + /* TODO: alpm_pkg_get_db() will not work on this target anymore */ + _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); + char *filepath = _alpm_filecache_find(filename); + pmpkg_t *pkgfile; + + pthread_mutex_unlock(payload->mutex); + int loaded = alpm_pkg_load(filepath, 1, &pkgfile); + pthread_mutex_lock(payload->mutex); + + if(loaded != 0) { + _alpm_pkg_free(pkgfile); + (*payload->errors)++; + *payload->data = alpm_list_add(*payload->data, strdup(filename)); + FREE(filepath); + goto next; + } + FREE(filepath); + pkgfile->reason = spkg->reason; /* copy over install reason */ + i->data = pkgfile; + _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ + + next: + (*payload->current)++; + int percent = (*payload->current * 100) / payload->numtargs; + PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, + payload->numtargs, *payload->current); + } + pthread_mutex_unlock(payload->mutex); + + return(NULL); +} + int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) { alpm_list_t *i, *j, *files = NULL; @@ -821,41 +895,36 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data) numtargs = alpm_list_count(trans->add); EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL); - errors = 0; - for(i = trans->add; i; i = i->next, current++) { - pmpkg_t *spkg = i->data; - int percent = (current * 100) / numtargs; - if(spkg->origin == PKG_FROM_FILE) { - continue; /* pkg_load() has been already called, this package is valid */ - } - PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent, - numtargs, current); + long numcpus = sysconf(_SC_NPROCESSORS_ONLN); + if(numcpus < 1) { + numcpus = 1; + } - const char *filename = alpm_pkg_get_filename(spkg); - const char *md5sum = alpm_pkg_get_md5sum(spkg); + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + _alpm_integrity_payload payload = { + .numcpus = numcpus, + .numtargs = numtargs, + .current = ¤t, + .trans = trans, + .errors = &errors, + .data = data, + .mutex = &mutex + }; - if(test_md5sum(trans, filename, md5sum) != 0) { - errors++; - *data = alpm_list_add(*data, strdup(filename)); - continue; + errors = 0; + { + pthread_t threads[numcpus]; + _alpm_integrity_payload payloads[numcpus]; + for(int i = 0; i < numcpus; i++) { + payloads[i] = payload; + payloads[i].thread = i; + pthread_create(&threads[i], NULL, _alpm_integrity_check_thread, &payloads[i]); } - /* load the package file and replace pkgcache entry with it in the target list */ - /* TODO: alpm_pkg_get_db() will not work on this target anymore */ - _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name); - char *filepath = _alpm_filecache_find(filename); - pmpkg_t *pkgfile; - if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) { - _alpm_pkg_free(pkgfile); - errors++; - *data = alpm_list_add(*data, strdup(filename)); - FREE(filepath); - continue; + for(int i = 0; i < numcpus; i++) { + pthread_join(threads[i], NULL); } - FREE(filepath); - pkgfile->reason = spkg->reason; /* copy over install reason */ - i->data = pkgfile; - _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */ } + PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100, numtargs, current); EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL); [1]: http://gitorious.org/dimension/dimension/blobs/master/libdimension/platform.... -- Tavian Barnes