[pacman-dev] [PATCH 2/2] Parallelize integrity checks.

Tavian Barnes tavianator at gmail.com
Mon Feb 21 17:38:35 EST 2011


Use the new _alpm_for_each_cpu() API to perform integrity checks in
parallel.  This speeds up integrity checks drastically on multi-core
machines.
---
 lib/libalpm/sync.c |  186 +++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 141 insertions(+), 45 deletions(-)

diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
index 859b8c9..52c87e1 100644
--- a/lib/libalpm/sync.c
+++ b/lib/libalpm/sync.c
@@ -32,6 +32,7 @@
 #include <unistd.h>
 #include <time.h>
 #include <limits.h>
+#include <pthread.h>
 
 /* libalpm */
 #include "sync.h"
@@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char *filename,
 	return(ret);
 }
 
+typedef struct {
+	pmtrans_t *trans;
+	alpm_list_t *deltas;
+	alpm_list_t **data;
+	pthread_mutex_t *mutex;
+} _alpm_delta_integrity_payload;
+
+static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus)
+{
+	_alpm_delta_integrity_payload *payload = ptr;
+	pmtrans_t *trans = payload->trans;
+	alpm_list_t *deltas = payload->deltas;
+	alpm_list_t **data = payload->data;
+	pthread_mutex_t *mutex = payload->mutex;
+	size_t numdeltas = alpm_list_count(deltas);
+
+	alpm_list_t *i;
+	size_t count = 0, range = (numdeltas * thread) / numcpus;
+	for (i = deltas; count < range; i = i->next, count++);
+
+	int errors = 0;
+	range = (numdeltas * (thread + 1)) / numcpus;
+	pthread_mutex_lock(mutex);
+	for(i = deltas; i; i = i->next) {
+		pmdelta_t *d = alpm_list_getdata(i);
+		const char *filename = alpm_delta_get_filename(d);
+		const char *md5sum = alpm_delta_get_md5sum(d);
+
+		/* Calculate md5sums in parallel */
+		pthread_mutex_unlock(mutex);
+		int test = test_md5sum(trans, filename, md5sum);
+		pthread_mutex_lock(mutex);
+
+		if(test != 0) {
+			errors++;
+			*data = alpm_list_add(*data, strdup(filename));
+		}
+	}
+	pthread_mutex_unlock(mutex);
+
+	return errors;
+}
+
+typedef struct {
+	pmtrans_t *trans;
+	alpm_list_t **data;
+	size_t numtargs;
+	size_t *current;
+	pthread_mutex_t *mutex;
+} _alpm_integrity_payload;
+
+static int _alpm_check_integrity(void *ptr, int thread, int numcpus)
+{
+	_alpm_integrity_payload *payload = ptr;
+	pmtrans_t *trans = payload->trans;
+	alpm_list_t **data = payload->data;
+	size_t numtargs = payload->numtargs;
+	size_t *current = payload->current;
+	pthread_mutex_t *mutex = payload->mutex;
+
+	alpm_list_t *i;
+	size_t count = 0, range = (numtargs * thread) / numcpus;
+	for (i = trans->add; count < range; i = i->next, count++);
+
+	int errors = 0;
+	range = (numtargs * (thread + 1)) / numcpus;
+	pthread_mutex_lock(mutex);
+	for(; count < range; i = i->next, count++) {
+		pmpkg_t *spkg = i->data;
+		if(spkg->origin == PKG_FROM_FILE) {
+			continue; /* pkg_load() has been already called, this package is valid */
+		}
+
+		const char *filename = alpm_pkg_get_filename(spkg);
+		const char *md5sum = alpm_pkg_get_md5sum(spkg);
+
+		/* Calculate md5sums in parallel */
+		pthread_mutex_unlock(mutex);
+		int test = test_md5sum(trans, filename, md5sum);
+		pthread_mutex_lock(mutex);
+
+		if(test != 0) {
+			errors++;
+			*data = alpm_list_add(*data, strdup(filename));
+			goto next;
+		}
+		/* load the package file and replace pkgcache entry with it in the target list */
+		/* TODO: alpm_pkg_get_db() will not work on this target anymore */
+		_alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
+		char *filepath = _alpm_filecache_find(filename);
+		pmpkg_t *pkgfile;
+
+		/* Load packages in parallel */
+		pthread_mutex_unlock(mutex);
+		int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
+		pthread_mutex_lock(mutex);
+
+		if(loaded != 0) {
+			_alpm_pkg_free(pkgfile);
+			errors++;
+			*data = alpm_list_add(*data, strdup(filename));
+			FREE(filepath);
+			goto next;
+		}
+		FREE(filepath);
+		pkgfile->reason = spkg->reason; /* copy over install reason */
+		i->data = pkgfile;
+		_alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
+
+	next:
+		++*current;
+		int percent = (*current * 100) / numtargs;
+		PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
+				numtargs, *current);
+	}
+	pthread_mutex_unlock(mutex);
+
+	return errors;
+}
+
 int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 {
 	alpm_list_t *i, *j, *files = NULL;
@@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 	/* if we have deltas to work with */
 	if(handle->usedelta && deltas) {
 		int ret = 0;
-		errors = 0;
+
 		/* Check integrity of deltas */
 		EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL);
 
-		for(i = deltas; i; i = i->next) {
-			pmdelta_t *d = alpm_list_getdata(i);
-			const char *filename = alpm_delta_get_filename(d);
-			const char *md5sum = alpm_delta_get_md5sum(d);
+		static pthread_mutex_t delta_integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
+		_alpm_delta_integrity_payload payload = {
+			.trans	= trans,
+			.deltas = deltas,
+			.data	= data,
+			.mutex	= &delta_integrity_mutex
+		};
+
+		errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, &payload);
+		EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
 
-			if(test_md5sum(trans, filename, md5sum) != 0) {
-				errors++;
-				*data = alpm_list_add(*data, strdup(filename));
-			}
-		}
 		if(errors) {
 			pm_errno = PM_ERR_DLT_INVALID;
 			goto error;
 		}
-		EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
 
 		/* Use the deltas to generate the packages */
 		EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL);
@@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 	numtargs = alpm_list_count(trans->add);
 	EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
 
-	errors = 0;
-	for(i = trans->add; i; i = i->next, current++) {
-		pmpkg_t *spkg = i->data;
-		int percent = (current * 100) / numtargs;
-		if(spkg->origin == PKG_FROM_FILE) {
-			continue; /* pkg_load() has been already called, this package is valid */
-		}
-		PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
-				numtargs, current);
-
-		const char *filename = alpm_pkg_get_filename(spkg);
-		const char *md5sum = alpm_pkg_get_md5sum(spkg);
+	static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
+	_alpm_integrity_payload payload = {
+		.trans	  = trans,
+		.data	  = data,
+		.numtargs = numtargs,
+		.current  = &current,
+		.mutex	  = &integrity_mutex
+	};
+	errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload);
 
-		if(test_md5sum(trans, filename, md5sum) != 0) {
-			errors++;
-			*data = alpm_list_add(*data, strdup(filename));
-			continue;
-		}
-		/* load the package file and replace pkgcache entry with it in the target list */
-		/* TODO: alpm_pkg_get_db() will not work on this target anymore */
-		_alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
-		char *filepath = _alpm_filecache_find(filename);
-		pmpkg_t *pkgfile;
-		if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
-			_alpm_pkg_free(pkgfile);
-			errors++;
-			*data = alpm_list_add(*data, strdup(filename));
-			FREE(filepath);
-			continue;
-		}
-		FREE(filepath);
-		pkgfile->reason = spkg->reason; /* copy over install reason */
-		i->data = pkgfile;
-		_alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
-	}
 	PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
 			numtargs, current);
 	EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);
-- 
1.7.4.1



More information about the pacman-dev mailing list