[pacman-dev] [PATCH 2/2] Parallelize integrity checks.
Tavian Barnes
tavianator at gmail.com
Mon Feb 21 17:38:35 EST 2011
Use the new _alpm_for_each_cpu() API to perform integrity checks in
parallel. This speeds up integrity checks drastically on multi-core
machines.
---
lib/libalpm/sync.c | 186 +++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 141 insertions(+), 45 deletions(-)
diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
index 859b8c9..52c87e1 100644
--- a/lib/libalpm/sync.c
+++ b/lib/libalpm/sync.c
@@ -32,6 +32,7 @@
#include <unistd.h>
#include <time.h>
#include <limits.h>
+#include <pthread.h>
/* libalpm */
#include "sync.h"
@@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char *filename,
return(ret);
}
+typedef struct {
+ pmtrans_t *trans;
+ alpm_list_t *deltas;
+ alpm_list_t **data;
+ pthread_mutex_t *mutex;
+} _alpm_delta_integrity_payload;
+
+static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus)
+{
+ _alpm_delta_integrity_payload *payload = ptr;
+ pmtrans_t *trans = payload->trans;
+ alpm_list_t *deltas = payload->deltas;
+ alpm_list_t **data = payload->data;
+ pthread_mutex_t *mutex = payload->mutex;
+ size_t numdeltas = alpm_list_count(deltas);
+
+ alpm_list_t *i;
+ size_t count = 0, range = (numdeltas * thread) / numcpus;
+ for (i = deltas; count < range; i = i->next, count++);
+
+ int errors = 0;
+ range = (numdeltas * (thread + 1)) / numcpus;
+ pthread_mutex_lock(mutex);
+ for(i = deltas; i; i = i->next) {
+ pmdelta_t *d = alpm_list_getdata(i);
+ const char *filename = alpm_delta_get_filename(d);
+ const char *md5sum = alpm_delta_get_md5sum(d);
+
+ /* Calculate md5sums in parallel */
+ pthread_mutex_unlock(mutex);
+ int test = test_md5sum(trans, filename, md5sum);
+ pthread_mutex_lock(mutex);
+
+ if(test != 0) {
+ errors++;
+ *data = alpm_list_add(*data, strdup(filename));
+ }
+ }
+ pthread_mutex_unlock(mutex);
+
+ return errors;
+}
+
+typedef struct {
+ pmtrans_t *trans;
+ alpm_list_t **data;
+ size_t numtargs;
+ size_t *current;
+ pthread_mutex_t *mutex;
+} _alpm_integrity_payload;
+
+static int _alpm_check_integrity(void *ptr, int thread, int numcpus)
+{
+ _alpm_integrity_payload *payload = ptr;
+ pmtrans_t *trans = payload->trans;
+ alpm_list_t **data = payload->data;
+ size_t numtargs = payload->numtargs;
+ size_t *current = payload->current;
+ pthread_mutex_t *mutex = payload->mutex;
+
+ alpm_list_t *i;
+ size_t count = 0, range = (numtargs * thread) / numcpus;
+ for (i = trans->add; count < range; i = i->next, count++);
+
+ int errors = 0;
+ range = (numtargs * (thread + 1)) / numcpus;
+ pthread_mutex_lock(mutex);
+ for(; count < range; i = i->next, count++) {
+ pmpkg_t *spkg = i->data;
+ if(spkg->origin == PKG_FROM_FILE) {
+ continue; /* pkg_load() has been already called, this package is valid */
+ }
+
+ const char *filename = alpm_pkg_get_filename(spkg);
+ const char *md5sum = alpm_pkg_get_md5sum(spkg);
+
+ /* Calculate md5sums in parallel */
+ pthread_mutex_unlock(mutex);
+ int test = test_md5sum(trans, filename, md5sum);
+ pthread_mutex_lock(mutex);
+
+ if(test != 0) {
+ errors++;
+ *data = alpm_list_add(*data, strdup(filename));
+ goto next;
+ }
+ /* load the package file and replace pkgcache entry with it in the target list */
+ /* TODO: alpm_pkg_get_db() will not work on this target anymore */
+ _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
+ char *filepath = _alpm_filecache_find(filename);
+ pmpkg_t *pkgfile;
+
+ /* Load packages in parallel */
+ pthread_mutex_unlock(mutex);
+ int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
+ pthread_mutex_lock(mutex);
+
+ if(loaded != 0) {
+ _alpm_pkg_free(pkgfile);
+ errors++;
+ *data = alpm_list_add(*data, strdup(filename));
+ FREE(filepath);
+ goto next;
+ }
+ FREE(filepath);
+ pkgfile->reason = spkg->reason; /* copy over install reason */
+ i->data = pkgfile;
+ _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
+
+ next:
+ ++*current;
+ int percent = (*current * 100) / numtargs;
+ PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
+ numtargs, *current);
+ }
+ pthread_mutex_unlock(mutex);
+
+ return errors;
+}
+
int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
{
alpm_list_t *i, *j, *files = NULL;
@@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
/* if we have deltas to work with */
if(handle->usedelta && deltas) {
int ret = 0;
- errors = 0;
+
/* Check integrity of deltas */
EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL);
- for(i = deltas; i; i = i->next) {
- pmdelta_t *d = alpm_list_getdata(i);
- const char *filename = alpm_delta_get_filename(d);
- const char *md5sum = alpm_delta_get_md5sum(d);
+ static pthread_mutex_t delta_integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
+ _alpm_delta_integrity_payload payload = {
+ .trans = trans,
+ .deltas = deltas,
+ .data = data,
+ .mutex = &delta_integrity_mutex
+ };
+
+ errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, &payload);
+ EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
- if(test_md5sum(trans, filename, md5sum) != 0) {
- errors++;
- *data = alpm_list_add(*data, strdup(filename));
- }
- }
if(errors) {
pm_errno = PM_ERR_DLT_INVALID;
goto error;
}
- EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
/* Use the deltas to generate the packages */
EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL);
@@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
numtargs = alpm_list_count(trans->add);
EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
- errors = 0;
- for(i = trans->add; i; i = i->next, current++) {
- pmpkg_t *spkg = i->data;
- int percent = (current * 100) / numtargs;
- if(spkg->origin == PKG_FROM_FILE) {
- continue; /* pkg_load() has been already called, this package is valid */
- }
- PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
- numtargs, current);
-
- const char *filename = alpm_pkg_get_filename(spkg);
- const char *md5sum = alpm_pkg_get_md5sum(spkg);
+ static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
+ _alpm_integrity_payload payload = {
+ .trans = trans,
+ .data = data,
+ .numtargs = numtargs,
+ .current = ¤t,
+ .mutex = &integrity_mutex
+ };
+ errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload);
- if(test_md5sum(trans, filename, md5sum) != 0) {
- errors++;
- *data = alpm_list_add(*data, strdup(filename));
- continue;
- }
- /* load the package file and replace pkgcache entry with it in the target list */
- /* TODO: alpm_pkg_get_db() will not work on this target anymore */
- _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
- char *filepath = _alpm_filecache_find(filename);
- pmpkg_t *pkgfile;
- if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
- _alpm_pkg_free(pkgfile);
- errors++;
- *data = alpm_list_add(*data, strdup(filename));
- FREE(filepath);
- continue;
- }
- FREE(filepath);
- pkgfile->reason = spkg->reason; /* copy over install reason */
- i->data = pkgfile;
- _alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
- }
PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
numtargs, current);
EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);
--
1.7.4.1
More information about the pacman-dev
mailing list