[aur-dev] [PATCH] git-serve: Use Python exceptions for error handling

Lukas Fleischer lfleischer at archlinux.org
Sun Dec 25 11:40:17 UTC 2016


Make it easier to reuse the helper functions provided by git-serve from
another Python script by throwing exceptions instead of terminating the
program on errors.

Signed-off-by: Lukas Fleischer <lfleischer at archlinux.org>
---
 aurweb/exceptions.py |  53 +++++++++++++++++
 aurweb/git/serve.py  | 157 +++++++++++++++++++++++++--------------------------
 2 files changed, 131 insertions(+), 79 deletions(-)
 create mode 100644 aurweb/exceptions.py

diff --git a/aurweb/exceptions.py b/aurweb/exceptions.py
new file mode 100644
index 0000000..5922b2d
--- /dev/null
+++ b/aurweb/exceptions.py
@@ -0,0 +1,53 @@
+class AurwebException(Exception):
+    pass
+
+
+class MaintenanceException(AurwebException):
+    pass
+
+
+class PermissionDeniedException(AurwebException):
+    def __init__(self, user):
+        msg = 'permission denied: {:s}'.format(user)
+        super(PermissionDeniedException, self).__init__(msg)
+
+
+class InvalidUserException(AurwebException):
+    def __init__(self, user):
+        msg = 'unknown user: {:s}'.format(user)
+        super(InvalidUserException, self).__init__(msg)
+
+
+class InvalidPackageBaseException(AurwebException):
+    def __init__(self, pkgbase):
+        msg = 'package base not found: {:s}'.format(pkgbase)
+        super(InvalidPackageBaseException, self).__init__(msg)
+
+
+class InvalidRepositoryNameException(AurwebException):
+    def __init__(self, pkgbase):
+        msg = 'invalid repository name: {:s}'.format(pkgbase)
+        super(InvalidRepositoryNameException, self).__init__(msg)
+
+
+class PackageBaseExistsException(AurwebException):
+    def __init__(self, pkgbase):
+        msg = 'package base already exists: {:s}'.format(pkgbase)
+        super(PackageBaseExistsException, self).__init__(msg)
+
+
+class InvalidReasonException(AurwebException):
+    def __init__(self, reason):
+        msg = 'invalid reason: {:s}'.format(reason)
+        super(InvalidReasonException, self).__init__(msg)
+
+
+class InvalidCommentException(AurwebException):
+    def __init__(self, comment):
+        msg = 'comment is too short: {:s}'.format(comment)
+        super(InvalidCommentException, self).__init__(msg)
+
+
+class InvalidArgumentsException(AurwebException):
+    def __init__(self, msg):
+        super(InvalidArgumentsException, self).__init__(msg)
diff --git a/aurweb/git/serve.py b/aurweb/git/serve.py
index 08c6541..e33fbeb 100755
--- a/aurweb/git/serve.py
+++ b/aurweb/git/serve.py
@@ -9,6 +9,7 @@ import time
 
 import aurweb.config
 import aurweb.db
+import aurweb.exceptions
 
 notify_cmd = aurweb.config.get('notifications', 'notify-cmd')
 
@@ -40,7 +41,7 @@ def list_repos(user):
     cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
     userid = cur.fetchone()[0]
     if userid == 0:
-        die('{:s}: unknown user: {:s}'.format(action, user))
+        raise aurweb.exceptions.InvalidUserException(user)
 
     cur = conn.execute("SELECT Name, PackagerUID FROM PackageBases " +
                        "WHERE MaintainerUID = ?", [userid])
@@ -51,16 +52,16 @@ def list_repos(user):
 
 def create_pkgbase(pkgbase, user):
     if not re.match(repo_regex, pkgbase):
-        die('{:s}: invalid repository name: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.InvalidRepositoryNameException(pkgbase)
     if pkgbase_exists(pkgbase):
-        die('{:s}: package base already exists: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.PackageBaseExistsException(pkgbase)
 
     conn = aurweb.db.Connection()
 
     cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
     userid = cur.fetchone()[0]
     if userid == 0:
-        die('{:s}: unknown user: {:s}'.format(action, user))
+        raise aurweb.exceptions.InvalidUserException(user)
 
     now = int(time.time())
     cur = conn.execute("INSERT INTO PackageBases (Name, SubmittedTS, " +
@@ -79,19 +80,19 @@ def create_pkgbase(pkgbase, user):
 def pkgbase_adopt(pkgbase, user, privileged):
     pkgbase_id = pkgbase_from_name(pkgbase)
     if not pkgbase_id:
-        die('{:s}: package base not found: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
 
     conn = aurweb.db.Connection()
 
     cur = conn.execute("SELECT ID FROM PackageBases WHERE ID = ? AND " +
                        "MaintainerUID IS NULL", [pkgbase_id])
     if not privileged and not cur.fetchone():
-        die('{:s}: permission denied: {:s}'.format(action, user))
+        raise aurweb.exceptions.PermissionDeniedException(user)
 
     cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
     userid = cur.fetchone()[0]
     if userid == 0:
-        die('{:s}: unknown user: {:s}'.format(action, user))
+        raise aurweb.exceptions.InvalidUserException(user)
 
     cur = conn.execute("UPDATE PackageBases SET MaintainerUID = ? " +
                        "WHERE ID = ?", [userid, pkgbase_id])
@@ -127,10 +128,10 @@ def pkgbase_get_comaintainers(pkgbase):
 def pkgbase_set_comaintainers(pkgbase, userlist, user, privileged):
     pkgbase_id = pkgbase_from_name(pkgbase)
     if not pkgbase_id:
-        die('{:s}: package base not found: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
 
     if not privileged and not pkgbase_has_full_access(pkgbase, user):
-        die('{:s}: permission denied: {:s}'.format(action, user))
+        raise aurweb.exceptions.PermissionDeniedException(user)
 
     conn = aurweb.db.Connection()
 
@@ -142,7 +143,7 @@ def pkgbase_set_comaintainers(pkgbase, userlist, user, privileged):
                            [olduser])
         userid = cur.fetchone()[0]
         if userid == 0:
-            die('{:s}: unknown user: {:s}'.format(action, user))
+            raise aurweb.exceptions.InvalidUserException(user)
         uids_old.add(userid)
 
     uids_new = set()
@@ -151,7 +152,7 @@ def pkgbase_set_comaintainers(pkgbase, userlist, user, privileged):
                            [newuser])
         userid = cur.fetchone()[0]
         if userid == 0:
-            die('{:s}: unknown user: {:s}'.format(action, user))
+            raise aurweb.exceptions.InvalidUserException(user)
         uids_new.add(userid)
 
     uids_add = uids_new - uids_old
@@ -196,10 +197,10 @@ def pkgreq_by_pkgbase(pkgbase_id, reqtype):
     return [row[0] for row in cur.fetchall()]
 
 
-def pkgreq_close(reqid, reason, comments, autoclose=False):
+def pkgreq_close(reqid, user, reason, comments, autoclose=False):
     statusmap = {'accepted': 2, 'rejected': 3}
     if reason not in statusmap:
-        die('{:s}: invalid reason: {:s}'.format(action, reason))
+        raise aurweb.exceptions.InvalidReasonException(reason)
     status = statusmap[reason]
 
     conn = aurweb.db.Connection()
@@ -210,7 +211,7 @@ def pkgreq_close(reqid, reason, comments, autoclose=False):
         cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
         userid = cur.fetchone()[0]
         if userid == 0:
-            die('{:s}: unknown user: {:s}'.format(action, user))
+            raise aurweb.exceptions.InvalidUserException(user)
 
     conn.execute("UPDATE PackageRequests SET Status = ?, ClosureComment = ? " +
                  "WHERE ID = ?", [status, comments, reqid])
@@ -224,18 +225,18 @@ def pkgreq_close(reqid, reason, comments, autoclose=False):
 def pkgbase_disown(pkgbase, user, privileged):
     pkgbase_id = pkgbase_from_name(pkgbase)
     if not pkgbase_id:
-        die('{:s}: package base not found: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
 
     initialized_by_owner = pkgbase_has_full_access(pkgbase, user)
     if not privileged and not initialized_by_owner:
-        die('{:s}: permission denied: {:s}'.format(action, user))
+        raise aurweb.exceptions.PermissionDeniedException(user)
 
     # TODO: Support disowning package bases via package request.
 
     # Scan through pending orphan requests and close them.
     comment = 'The user {:s} disowned the package.'.format(user)
     for reqid in pkgreq_by_pkgbase(pkgbase_id, 'orphan'):
-        pkgreq_close(reqid, 'accepted', comment, True)
+        pkgreq_close(reqid, user, 'accepted', comment, True)
 
     comaintainers = []
     new_maintainer_userid = None
@@ -262,7 +263,7 @@ def pkgbase_disown(pkgbase, user, privileged):
     cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
     userid = cur.fetchone()[0]
     if userid == 0:
-        die('{:s}: unknown user: {:s}'.format(action, user))
+            raise aurweb.exceptions.InvalidUserException(user)
 
     subprocess.Popen((notify_cmd, 'disown', str(pkgbase_id), str(userid)))
 
@@ -272,14 +273,16 @@ def pkgbase_disown(pkgbase, user, privileged):
 def pkgbase_flag(pkgbase, user, comment):
     pkgbase_id = pkgbase_from_name(pkgbase)
     if not pkgbase_id:
-        die('{:s}: package base not found: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
+    if len(comment) < 3:
+        raise aurweb.exceptions.InvalidCommentException(comment)
 
     conn = aurweb.db.Connection()
 
     cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
     userid = cur.fetchone()[0]
     if userid == 0:
-        die('{:s}: unknown user: {:s}'.format(action, user))
+        raise aurweb.exceptions.InvalidUserException(user)
 
     now = int(time.time())
     conn.execute("UPDATE PackageBases SET " +
@@ -295,14 +298,14 @@ def pkgbase_flag(pkgbase, user, comment):
 def pkgbase_unflag(pkgbase, user):
     pkgbase_id = pkgbase_from_name(pkgbase)
     if not pkgbase_id:
-        die('{:s}: package base not found: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
 
     conn = aurweb.db.Connection()
 
     cur = conn.execute("SELECT ID FROM Users WHERE Username = ?", [user])
     userid = cur.fetchone()[0]
     if userid == 0:
-        die('{:s}: unknown user: {:s}'.format(action, user))
+        raise aurweb.exceptions.InvalidUserException(user)
 
     if user in pkgbase_get_comaintainers(pkgbase):
         conn.execute("UPDATE PackageBases SET OutOfDateTS = NULL " +
@@ -318,7 +321,7 @@ def pkgbase_unflag(pkgbase, user):
 def pkgbase_set_keywords(pkgbase, keywords):
     pkgbase_id = pkgbase_from_name(pkgbase)
     if not pkgbase_id:
-        die('{:s}: package base not found: {:s}'.format(action, pkgbase))
+        raise aurweb.exceptions.InvalidPackageBaseException(pkgbase)
 
     conn = aurweb.db.Connection()
 
@@ -377,29 +380,33 @@ def usage(cmds):
     exit(0)
 
 
-def main():
-    user = os.environ.get('AUR_USER')
-    privileged = (os.environ.get('AUR_PRIVILEGED', '0') == '1')
-    ssh_cmd = os.environ.get('SSH_ORIGINAL_COMMAND')
-    ssh_client = os.environ.get('SSH_CLIENT')
+def checkarg_atleast(cmdargv, *argdesc):
+    if len(cmdargv) - 1 < len(argdesc):
+        msg = 'missing {:s}'.format(argdesc[len(cmdargv) - 1])
+        raise aurweb.exceptions.InvalidArgumentsException(msg)
+
+
+def checkarg_atmost(cmdargv, *argdesc):
+    if len(cmdargv) - 1 > len(argdesc):
+        raise aurweb.exceptions.InvalidArgumentsException('too many arguments')
 
-    if not ssh_cmd:
-        die_with_help("Interactive shell is disabled.")
-    cmdargv = shlex.split(ssh_cmd)
-    action = cmdargv[0]
-    remote_addr = ssh_client.split(' ')[0] if ssh_client else None
 
+def checkarg(cmdargv, *argdesc):
+    checkarg_atleast(cmdargv, *argdesc)
+    checkarg_atmost(cmdargv, *argdesc)
+
+
+def serve(action, cmdargv, user, privileged, remote_addr):
     if enable_maintenance:
         if remote_addr not in maintenance_exc:
-            die("The AUR is down due to maintenance. We will be back soon.")
+            raise aurweb.exceptions.MaintenanceException
 
     if action == 'git' and cmdargv[1] in ('upload-pack', 'receive-pack'):
         action = action + '-' + cmdargv[1]
         del cmdargv[1]
 
     if action == 'git-upload-pack' or action == 'git-receive-pack':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing path".format(action))
+        checkarg(cmdargv, 'path')
 
         path = cmdargv[1].rstrip('/')
         if not path.startswith('/'):
@@ -408,11 +415,11 @@ def main():
             path = path + '.git'
         pkgbase = path[1:-4]
         if not re.match(repo_regex, pkgbase):
-            die('{:s}: invalid repository name: {:s}'.format(action, pkgbase))
+            raise aurweb.exceptions.InvalidRepositoryNameException(pkgbase)
 
         if action == 'git-receive-pack' and pkgbase_exists(pkgbase):
             if not privileged and not pkgbase_has_write_access(pkgbase, user):
-                die('{:s}: permission denied: {:s}'.format(action, user))
+                raise aurweb.exceptions.PermissionDeniedException(user)
 
         os.environ["AUR_USER"] = user
         os.environ["AUR_PKGBASE"] = pkgbase
@@ -420,79 +427,48 @@ def main():
         cmd = action + " '" + repo_path + "'"
         os.execl(git_shell_cmd, git_shell_cmd, '-c', cmd)
     elif action == 'set-keywords':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
+        checkarg(cmdargv, 'repository name')
         pkgbase_set_keywords(cmdargv[1], cmdargv[2:])
     elif action == 'list-repos':
-        if len(cmdargv) > 1:
-            die_with_help("{:s}: too many arguments".format(action))
+        checkarg(cmdargv)
         list_repos(user)
     elif action == 'setup-repo':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
-        if len(cmdargv) > 2:
-            die_with_help("{:s}: too many arguments".format(action))
+        checkarg(cmdargv, 'repository name')
         warn('{:s} is deprecated. '
              'Use `git push` to create new repositories.'.format(action))
         create_pkgbase(cmdargv[1], user)
     elif action == 'restore':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
-        if len(cmdargv) > 2:
-            die_with_help("{:s}: too many arguments".format(action))
+        checkarg(cmdargv, 'repository name')
 
         pkgbase = cmdargv[1]
-        if not re.match(repo_regex, pkgbase):
-            die('{:s}: invalid repository name: {:s}'.format(action, pkgbase))
-
-        if pkgbase_exists(pkgbase):
-            die('{:s}: package base exists: {:s}'.format(action, pkgbase))
         create_pkgbase(pkgbase, user)
 
         os.environ["AUR_USER"] = user
         os.environ["AUR_PKGBASE"] = pkgbase
         os.execl(git_update_cmd, git_update_cmd, 'restore')
     elif action == 'adopt':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
-        if len(cmdargv) > 2:
-            die_with_help("{:s}: too many arguments".format(action))
+        checkarg(cmdargv, 'repository name')
 
         pkgbase = cmdargv[1]
         pkgbase_adopt(pkgbase, user, privileged)
     elif action == 'disown':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
-        if len(cmdargv) > 2:
-            die_with_help("{:s}: too many arguments".format(action))
+        checkarg(cmdargv, 'repository name')
 
         pkgbase = cmdargv[1]
         pkgbase_disown(pkgbase, user, privileged)
     elif action == 'flag':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
-        if len(cmdargv) < 3:
-            die_with_help("{:s}: missing comment".format(action))
-        if len(cmdargv) > 3:
-            die_with_help("{:s}: too many arguments".format(action))
+        checkarg(cmdargv, 'repository name', 'comment')
 
         pkgbase = cmdargv[1]
         comment = cmdargv[2]
-        if len(comment) < 3:
-            die_with_help("{:s}: comment is too short".format(action))
-
         pkgbase_flag(pkgbase, user, comment)
     elif action == 'unflag':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
-        if len(cmdargv) > 2:
-            die_with_help("{:s}: too many arguments".format(action))
+        checkarg(cmdargv, 'repository name')
 
         pkgbase = cmdargv[1]
         pkgbase_unflag(pkgbase, user)
     elif action == 'set-comaintainers':
-        if len(cmdargv) < 2:
-            die_with_help("{:s}: missing repository name".format(action))
+        checkarg_atleast(cmdargv, 'repository name')
 
         pkgbase = cmdargv[1]
         userlist = cmdargv[2:]
@@ -514,7 +490,30 @@ def main():
         }
         usage(cmds)
     else:
-        die_with_help("invalid command: {:s}".format(action))
+        msg = 'invalid command: {:s}'.format(action)
+        raise aurweb.exceptions.InvalidArgumentsException(msg)
+
+
+def main():
+    user = os.environ.get('AUR_USER')
+    privileged = (os.environ.get('AUR_PRIVILEGED', '0') == '1')
+    ssh_cmd = os.environ.get('SSH_ORIGINAL_COMMAND')
+    ssh_client = os.environ.get('SSH_CLIENT')
+
+    if not ssh_cmd:
+        die_with_help("Interactive shell is disabled.")
+    cmdargv = shlex.split(ssh_cmd)
+    action = cmdargv[0]
+    remote_addr = ssh_client.split(' ')[0] if ssh_client else None
+
+    try:
+        serve(action, cmdargv, user, privileged, remote_addr)
+    except aurweb.exceptions.MaintenanceException:
+        die("The AUR is down due to maintenance. We will be back soon.")
+    except aurweb.exceptions.InvalidArgumentsException as e:
+        die_with_help('{:s}: {}'.format(action, e))
+    except aurweb.exceptions.AurwebException as e:
+        die('{:s}: {}'.format(action, e))
 
 
 if __name__ == '__main__':
-- 
2.11.0


More information about the aur-dev mailing list