[pacman-dev] Paralellising integrity checks

Tavian Barnes tavianator at tavianator.com
Sat Feb 19 16:24:48 EST 2011


On 19 February 2011 06:28, Nezmer <git at nezmer.info> wrote:
> Actually, The sysconf() method  works at least in FreeBSD and the man page
> says the sysconf interface is defined by POSIX.1

The sysconf() interface is specified by POSIX.1, but
_SC_NPROCESSORS_ONLN is a non-standard extension.

>> You can look at how x264 guys implemented this. Check out
>> x264_cpu_num_processors() in common/cpu.c in their source tree.

Seems to be basically a more platform-complete version of what I'm
doing here [1].  But yeah, we'd need something like that if pacman is
supposed to run on everything unix-based.  I don't imagine we care
much about Windows compatibility :)

>> I'm not sure hard-coding supported platforms through ifdefs would be
>> accepted in pacman/libalpm.

There's code that does this already in lib/libalpm/diskspace.c, for
example.  It's not like this would limit the supported platforms
either; we can just pretend there's only 1 core when we don't know how
to count them.

Anyway, I've made a quick and dirty patch as a proof of concept.  It
drops the time spent doing integrity checks from 9 seconds to 3
seconds on an 88MB update.  The improvements I plan to maker are to
hide some of the threading behind an _alpm_for_each_cpu() API, use it
for delta integrity checks too, and divide the packages between the
cores evenly in terms of package size rather than package count.  And
of course add the missing error checks.

diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
index 859b8c9..595abd9 100644
--- a/lib/libalpm/sync.c
+++ b/lib/libalpm/sync.c
@@ -32,6 +32,7 @@
 #include <unistd.h>
 #include <time.h>
 #include <limits.h>
+#include <pthread.h>

 /* libalpm */
 #include "sync.h"
@@ -687,6 +688,79 @@ static int test_md5sum(pmtrans_t *trans, const
char *filename,
 	return(ret);
 }

+/* Thread payload for checking package integrity */
+typedef struct {
+	size_t thread, numcpus, numtargs;
+	volatile size_t *current;
+	pmtrans_t *trans;
+	int *errors;
+	alpm_list_t **data;
+
+	pthread_mutex_t *mutex;
+} _alpm_integrity_payload;
+
+static void *_alpm_integrity_check_thread(void *ptr)
+{
+	_alpm_integrity_payload *payload = ptr;
+	pmtrans_t *trans = payload->trans;
+
+	alpm_list_t *i;
+	size_t count = 0, range = (payload->numtargs * payload->thread) /
payload->numcpus;
+	for(i = trans->add; count < range; i = i->next, count++);
+
+	range = (payload->numtargs * (payload->thread + 1)) / payload->numcpus;
+	pthread_mutex_lock(payload->mutex);
+	for(; count < range; i = i->next, count++) {
+		pmpkg_t *spkg = i->data;
+		if(spkg->origin == PKG_FROM_FILE) {
+			continue; /* pkg_load() has been already called, this package is valid */
+		}
+
+		const char *filename = alpm_pkg_get_filename(spkg);
+		const char *md5sum = alpm_pkg_get_md5sum(spkg);
+
+		pthread_mutex_unlock(payload->mutex);
+		int test = test_md5sum(trans, filename, md5sum);
+		pthread_mutex_lock(payload->mutex);
+
+		if (test != 0) {
+			(*payload->errors)++;
+			*payload->data = alpm_list_add(*payload->data, strdup(filename));
+			goto next;
+		}
+		/* load the package file and replace pkgcache entry with it in the
target list */
+		/* TODO: alpm_pkg_get_db() will not work on this target anymore */
+		_alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file
for target %s\n", spkg->name);
+		char *filepath = _alpm_filecache_find(filename);
+		pmpkg_t *pkgfile;
+
+		pthread_mutex_unlock(payload->mutex);
+		int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
+		pthread_mutex_lock(payload->mutex);
+
+		if(loaded != 0) {
+			_alpm_pkg_free(pkgfile);
+			(*payload->errors)++;
+			*payload->data = alpm_list_add(*payload->data, strdup(filename));
+			FREE(filepath);
+			goto next;
+		}
+		FREE(filepath);
+		pkgfile->reason = spkg->reason; /* copy over install reason */
+		i->data = pkgfile;
+		_alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
+
+	next:
+		(*payload->current)++;
+		int percent = (*payload->current * 100) / payload->numtargs;
+		PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
+				payload->numtargs, *payload->current);
+	}
+	pthread_mutex_unlock(payload->mutex);
+
+	return(NULL);
+}
+
 int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 {
 	alpm_list_t *i, *j, *files = NULL;
@@ -821,41 +895,36 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t
*db_local, alpm_list_t **data)
 	numtargs = alpm_list_count(trans->add);
 	EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);

-	errors = 0;
-	for(i = trans->add; i; i = i->next, current++) {
-		pmpkg_t *spkg = i->data;
-		int percent = (current * 100) / numtargs;
-		if(spkg->origin == PKG_FROM_FILE) {
-			continue; /* pkg_load() has been already called, this package is valid */
-		}
-		PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
-				numtargs, current);
+	long numcpus = sysconf(_SC_NPROCESSORS_ONLN);
+	if(numcpus < 1) {
+		numcpus = 1;
+	}

-		const char *filename = alpm_pkg_get_filename(spkg);
-		const char *md5sum = alpm_pkg_get_md5sum(spkg);
+	static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+	_alpm_integrity_payload payload = {
+		.numcpus  = numcpus,
+		.numtargs = numtargs,
+		.current  = &current,
+		.trans	  = trans,
+		.errors	  = &errors,
+		.data	  = data,
+		.mutex	  = &mutex
+	};

-		if(test_md5sum(trans, filename, md5sum) != 0) {
-			errors++;
-			*data = alpm_list_add(*data, strdup(filename));
-			continue;
+	errors = 0;
+	{
+		pthread_t threads[numcpus];
+		_alpm_integrity_payload payloads[numcpus];
+		for(int i = 0; i < numcpus; i++) {
+			payloads[i] = payload;
+			payloads[i].thread = i;
+			pthread_create(&threads[i], NULL, _alpm_integrity_check_thread,
&payloads[i]);
 		}
-		/* load the package file and replace pkgcache entry with it in the
target list */
-		/* TODO: alpm_pkg_get_db() will not work on this target anymore */
-		_alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file
for target %s\n", spkg->name);
-		char *filepath = _alpm_filecache_find(filename);
-		pmpkg_t *pkgfile;
-		if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
-			_alpm_pkg_free(pkgfile);
-			errors++;
-			*data = alpm_list_add(*data, strdup(filename));
-			FREE(filepath);
-			continue;
+		for(int i = 0; i < numcpus; i++) {
+			pthread_join(threads[i], NULL);
 		}
-		FREE(filepath);
-		pkgfile->reason = spkg->reason; /* copy over install reason */
-		i->data = pkgfile;
-		_alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
 	}
+
 	PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
 			numtargs, current);
 	EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);

[1]: http://gitorious.org/dimension/dimension/blobs/master/libdimension/platform.c#line82

-- 
Tavian Barnes
-------------- next part --------------
A non-text attachment was scrubbed...
Name: parallelise-integrity-checks.patch
Type: text/x-patch
Size: 5201 bytes
Desc: not available
URL: <http://mailman.archlinux.org/pipermail/pacman-dev/attachments/20110219/072d6995/attachment-0001.bin>


More information about the pacman-dev mailing list