Mercurial > ~dholland > hg > swallowtail > index.cgi
view shelltools/query-pr/query.py @ 50:bb0ece71355e
Untabify.
author | David A. Holland |
---|---|
date | Sat, 02 Apr 2022 18:10:52 -0400 |
parents | 4b7f0ee35994 |
children | ef6d572c4e1e |
line wrap: on
line source
#!@PYTHON@ import sys import argparse import psycopg2 program_description = """ Search for and retrieve problem reports. """ program_version = "@VERSION@" ############################################################ # settings outfile = sys.stdout outmaterial = "headers" outformat = "text" ############################################################ # database field access # # Fields of PRs that we might search are spread across a number of # tables and require varying joins to get them. And, because of # classication schemes, the set of fields isn't static and we can't # just assemble a massive view with one column for each field. # # The QueryBuilder class knows how to arrange for all known fields to # be present. # class QueryBuilder: # these fields are in the PRs table prtable_fields = [ "id", "synopsis", "confidential", "state", "locked", "timeout_date", "timeout_state", "arrival_schemaversion", "arrival_date", "modified_date", "closed_date", "release", "environment" ] # these fields are aliases for others alias_fields = { "number" : "id", "date" : "arrival_date", } def __init__(self): self.present = {} self.joined = {} self.fromitems = [] self.whereitems = [] self.order = None def setorder(self, order): self.order = order # add to present{} and return the value for convenience (internal) def makepresent(self, field, name): self.present[field] = name return name # add a join item (once only) (internal) def addjoin(self, table, as_ = None): if as_ is not None: key = table + "-" + as_ val = table + " AS " + as_ else: key = table val = table if key not in self.joined: self.joined[key] = True self.fromitems.append(val) # returns a sql expression for the field def getfield(self, field): # already-fetched fields if field in self.present: return self.present[field] # aliases for other fields if field in alias_fields: return self.getfield(alias_fields[field]) # simple fields directly in the PRs table if field in prtable_fields: self.addjoin("PRs") return self.makepresent(field, "PRs." + field) # now it gets more interesting... if field == "closed": self.addjoin("PRs") self.addjoin("states") self.addwhere("PRs.state = states.name") return self.makepresent(field, "states.closed") # XXX let's pick one set of names and use them everywhere # (e.g. change "posttime" in the schema to "message_date" # or something) if field == "comment_date" or field == "posttime": self.addjoin("PRs") self.addjoin("messages") self.addwhere("PRs.id = messages.pr") return self.makepresent(field, "messages.posttime") if field == "comment" or field == "message" or field == "post": self.addjoin("PRs") self.addjoin("messages") self.addwhere("PRs.id = messages.pr") return self.makepresent(field, "messages.body") if field == "attachment": self.addjoin("PRs") self.addjoin("messages") self.addjoin("attachments") self.addwhere("PRs.id = messages.pr") self.addwhere("messages.id = attachments.msgid") return self.makepresent(field, "attachments.body") if field == "patch": self.addjoin("PRs") self.addjoin("messages") self.addjoin("attachments", "patches") self.addwhere("PRs.id = messages.pr") self.addwhere("messages.id = patches.msgid") self.addwhere("patches.mimetype = " + "'application/x-patch'") return self.makepresent(field, "patches.body") if field == "mimetype": subquery = "((SELECT mtmessages1.pr as pr, " + \ "mtmessages1.mimetype as mimetype " + \ "FROM messages as mtmessages1) " + \ "UNION " + \ "(SELECT mtmessages2.pr as pr, " + \ "mtattach2.mimetype as mimetype " + \ "FROM messages as mtmessages2, " + \ " attachments as mtattach2 " + \ "WHERE mtmessages2.id = mtattach2.msgid))" self.addjoin("PRs") self.addjoin(subquery, "mimetypes") self.addwhere("PRs.id = mimetypes.pr") return self.makepresent(field, "mimetypes.mimetype") # XXX: need view userstrings # select (id, username as name) from users # union select (id, realname as name) from users # (allow searching emails? ugh) if field == "originator" or field == "submitter": self.addjoin("PRs") self.addjoin("userstrings", "originators") self.addwhere("PRs.originator = originators.id") return self.makepresent(field, "originators.name") if field == "reporter" or field == "respondent": self.addjoin("PRs") self.addjoin("subscriptions") self.addjoin("userstrings", "reporters") self.addwhere("subscriptions.userid = reporters.id") self.addwhere("subscriptions.reporter") return self.makepresent(field, "reporters.name") if field == "responsible": self.addjoin("PRs") self.addjoin("subscriptions") self.addjoin("userstrings", "responsibles") self.addwhere("subscriptions.userid = responsibles.id") self.addwhere("subscriptions.responsible") return self.makepresent(field, "responsibles.name") if field in hierclasses: col = field + "_data" self.addjoin("PRs") self.addjoin("hierclass_data", col) self.addwhere("PRs.id = %s.pr" % col) self.addwhere("%s.scheme = '%s'" % (col, field)) return self.makepresent(field, "%s.value" % col) if field in flatclasses: col = field + "_data" self.addjoin("PRs") self.addjoin("flatclass_data", col) self.addwhere("PRs.id = %s.pr" % col) self.addwhere("%s.scheme = '%s'" % (col, field)) return self.makepresent(field, "%s.value" % col) if field in textclasses: col = field + "_data" self.addjoin("PRs") self.addjoin("textclass_data", col) self.addwhere("PRs.id = %s.pr" % col) self.addwhere("%s.scheme = '%s'" % (col, field)) return self.makepresent(field, "%s.value" % col) if field in tagclasses: col = field + "_data" self.addjoin("PRs") self.addjoin("tagclass_data", col) self.addwhere("PRs.id = %s.pr" % col) self.addwhere("%s.scheme = '%s'" % (col, field)) return self.makepresent(field, "%s.value" % col) sys.stderr.write("Unknown field %s" % field) exit(1) # end getfield # emit sql def build(self, sels): s = ", ".join(sels) f = ", ".join(self.fromitems) w = " and ".join(self.whereitems) q = "SELECT %s\nFROM %s\nWHERE %s\n" % (s, f, w) if self.order is not None: q = q + "ORDER BY " + self.order + "\n" return q # endif # end class QueryBuilder # XXX we need to add dynamically: # hierclass_names.name to hierclasses[] # flatclass_names.name to flatclasses[] # textclass_names.name to textclasses[] # tagclass_names.name to tagclasses[] ############################################################ # database dblink = None def opendb(): global dblink host = "localhost" user = "swallowtail" database = "swallowtail" dblink = psycopg2.connect("host=%s user=%s dbname=%s" % (host, user, database)) # end opendb def closedb(): global dblink dblink.close() dblink = None # end closedb def querydb(qtext, args): print "Executing this query:" print qtext print "Args are:" print args cursor = dblink.cursor() cursor.execute(qtext, args) result = cursor.fetchall() cursor.close() return result # end querydb ############################################################ # query class for searches # XXX: obsolete, remove class Query: def __init__(self): self.selections = [] self.tables = [] self.constraints = [] self.args = [] prtables = ["PRs"] prconstraints = [] def select(self, s): self.selections.append(s) def addtable(self, t): assert(t not in self.tables) self.tables.append(t) def constrain(self, expr): self.constraints.append(t) def internval(self, val): num = len(self.args) self.args[num] = val return "$%d" % num def textify(self): s = "SELECT %s\n" % ",".join(self.selections) f = "FROM %s\n" % ",".join(self.tables) w = "WHERE %s\n" % " AND ".join(self.constraints) return s + f + w # end class Query def regexp_constraint(q, field, value): cleanval = q.internval(value) if not isregexp(value): return "%s = %s" % (field, cleanval) else: # XXX what's the right operator again? return "%s ~= %s" % (field, cleanval) # end regexp_constraint def intrange_constraint(q, field, value): (lower, upper) = args.number if lower is not None: assert(typeof(lower) == int) prq.constrain("%s >= %d" % (field, lower)) if upper is not None: assert(typeof(upper) == int) prq.constrain("%s <= %d" % (field, upper)) # end intrange_constraint def daterange_constraint(q, field, value): # XXX assert(0) # end daterange_constraint ############################################################ # this is old code that needs to be merged or deleted into the new stuff def oldstuff(): # If we're doing something other than a search, do it now if args.attach is not None: get_attachment(args.attach) exit(0) if args.message is not None: get_message(args.message) exit(0) if args.prs is not None and len(args.prs) > 0: show_prs(args.prs) exit(0) # # Collect up the search constraints # # 1. Constraints on the PRs table checkprtable = False prq = Query() prq.select("PRs.id as id") prq.addtable("PRs") if not args.closed: checkprtable = True prq.addtable("states") prq.constrain("PRs.state = states.name") prq.constrain("states.closed = FALSE") if args.public: checkprtable = True prq.constrain("NOT PRs.confidential") if args.number is not None: checkprtable = True intrange_constraint(prq, "PRs.id", args.number) if args.synopsis is not None: checkprtable = True regexp_constraint(prq, "PRs.synopsis", args.synopsis) if args.confidential is not None: checkprtable = True assert(typeof(args.confidential) == bool) if args.confidential: prq.constrain("PRs.confidential") else: prq.constrain("not PRs.confidential") if args.state is not None: checkprtable = True regexp_constraint(prq, "PRs.state", args.state) if args.locked is not None: checkprtable = True assert(typeof(args.locked) == bool) if args.locked: prq.constrain("PRs.locked") else: prq.constrain("not PRs.locked") if args.arrival_schemaversion is not None: checkprtable = True intrange_constraint(prq, "PRs.arrival_schemaversion", args.arrival_schemaversion) if args.arrival_date is not None: checkprtable = True daterange_constraint(prq, "PRs.arrival_date", args.arrival_date) if args.closed_date is not None: checkprtable = True daterange_constraint(prq, "PRs.closed_date", args.closed_date) if args.last_modified is not None: checkprtable = True daterange_constraint(prq, "PRs.last_modified", args.last_modified) if args.release is not None: checkprtable = True regexp_constraint(prq, "PRs.release", args.release) if args.environment is not None: checkprtable = True regexp_constraint(prq, "PRs.environment", args.environment) if args.originator_name is not None or \ args.originator_email is not None: prq.addtable("usermail as originator") prq.constrain("PRs.originator = originator.id") if args.originator_name is not None: checkprtable = True regexp_constraint(prq, "originator.realname", args.originator_name) if args.originator_email is not None: checkprtable = True regexp_constraint(prq, "originator.email", args.originator_name) if args.originator_id is not None: checkprtable = True intrange_constraint(prq, "PRs.originator", args.originator_id) queries = [] if checkprtable: queries.append(prq) if args.responsible is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.responsible) sq.constrain("subscriptions.responsible") queries.append(sq) if args.respondent is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users as subscribed") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.respondent) sq.constrain("subscriptions.reporter") queries.append(sq) if args.subscribed is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users as subscribed") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.subscribed) queries.append(sq) if args.messages is not None: mq = Query() mq.select("messages.pr as id") mq.addtable("messages") regexp_constraint(sq, "messages.text", args.messages) queries.append(mq) if args.adminlog is not None: aq = Query() aq.select("adminlog.pr as id") aq.addtable("adminlog") regexp_constraint(sq, "adminlog.change", args.adminlog) regexp_constraint(sq, "adminlog.comment", args.adminlog) assert(len(aq.constraints) == 2) x = "%s OR %s" % (aq.constraints[0], aq.constraints[1]) aq.constraints = [x] queries.append(aq) if args.anytext is not None: choke("--anytext isn't supported yet") for scheme in classification_schemes: if args[scheme] is not None: schemetype = classification_schemetypes[scheme] tbl = "%sclass_data" % schemetype cq = Query() cq.select("scheme.pr as id") cq.addtable("%s as scheme" % schemetype) cq.constrain("scheme.scheme = '%s'" % scheme) regexp_constraint(cq, "scheme.value", args[scheme]) queries.append(cq) # end loop querytexts = [q.textify() for q in queries] return "INTERSECT\n".join(querytexts) ############################################################ # printing class PrintText: def __init__(self, output): self.lines = (output == "RAW" or output == "LIST") def printheader(self, row): # nothing pass def printrow(self, row): # XXX print row def printfooter(self, row): # nothing pass # end class PrintText class PrintCsv: def __init__(self, output): # nothing pass def printheader(self, row): # XXX pass def printrow(self, row): # XXX pass def printfooter(self, row): # nothing pass # end class PrintCsv class PrintXml: def __init__(self, output): # nothing pass def printheader(self, row): # XXX pass def printrow(self, row): # XXX pass def printfooter(self, row): # XXX pass # end class PrintXml class PrintJson: def __init__(self, output): # nothing pass def printheader(self, row): # XXX pass def printrow(self, row): # XXX pass def printfooter(self, row): # XXX pass # end class PrintJson class PrintRdf: def __init__(self, output): # nothing pass def printheader(self, row): # XXX pass def printrow(self, row): # XXX pass def printfooter(self, row): # XXX pass # end class PrintRdf class PrintRdflike: def __init__(self, output): # nothing pass def printheader(self, row): # XXX pass def printrow(self, row): # XXX pass def printfooter(self, row): # XXX pass # end class PrintRdflike def print_prs(ids): if sel.outformat == "TEXT": mkprinter = PrintText elif sel.outformat == "CSV": mkprinter = PrintCsv elif sel.outformat == "XML": mkprinter = PrintXml elif sel.outformat == "JSON": mkprinter = PrintJson elif sel.outformat == "RDF": mkprinter = PrintRdf elif sel.outformat == "RDFLIKE": mkprinter = PrintRdflike else: assert(False) # reset the printer printer = mkprinter(sel.output) if sel.output == "RAW": printer.printheader(ids[0]) for id in ids: printer(id) printer.printfooter(ids[0]) return elif sel.output == "LIST": # XXX is there a clean way to do this passing the # whole list of ids at once? query = "SELECT id, synopsis\n" + \ "FROM PRs\n" + \ "WHERE id = $1" elif sel.output == "HEADERS": query = None # XXX elif sel.output == "META": query = None # XXX elif sel.output == "FULL": query = None # XXX else: assert(False) first = True for id in ids: results = querydb(query, [id]) if first: printer.printheader(results[0]) first = False for r in results: printer.printrow(r) printer.printfooter(results[0]) # end print_prs # XXX if in public mode we need to check if the PR is public def print_message(pr, msgnum): query = "SELECT users.username AS username,\n" + \ " users.realname AS realname,\n" + \ " messages.id AS id, parent_id,\n" + \ " posttime, mimetype, body\n" + \ "FROM messages, users\n" + \ "WHERE messages.who = users.id\n" + \ " AND messages.pr = $1\n" + \ " AND messages.number_in_pr = $2\n" # Note that while pr is safe, msgnum came from the commandline # and may not be. results = querydb(query, [pr, msgnum]) [result] = results (username, realname, id, parent_id, posttime, mimetype, body) = result # XXX honor mimetype # XXX honor output format (e.g. html) sys.stdout.write("From swallowtail@%s %s\n" % (organization,posttime)) sys.stdout.write("From: %s (%s)\n" % (username, realname)) sys.stdout.write("References: %s\n" % parent_id) sys.stdout.write("Date: %s\n" % posttime) sys.stdout.write("Content-Type: %s\n" % mimetype) sys.stdout.write("\n") sys.stdout.write(body) # end print_message # XXX if in public mode we need to check if the PR is public def print_attachment(pr, attachnum): query = "SELECT a.mimetype as mimetype, a.body as body\n" + \ "FROM messages, attachments as a\n" + \ "WHERE messages.pr = $1\n" + \ " AND messages.id = a.msgid\n" + \ " AND a.number_in_pr = $2\n" # Note that while pr is safe, attachnum came from the # commandline and may not be. results = querydb(query, [pr, msgnum]) [result] = results (mimetype, body) = result # XXX honor mimetype # XXX need an http output mode so we can send the mimetype! sys.stdout.write(body) # end print_attachment ############################################################ # AST for query class Invocation: class Query: Q_TERM = 1 # XXX unused so far Q_SQL = 2 Q_AND = 3 Q_OR = 4 Q_TSTRING = 5 Q_QSTRING = 6 def __init__(self, type): self.type = type def doterm(term): self = Query(Q_TERM) self.term = term return self def dosql(s): self = Query(Q_SQL) self.sql = s return self def doand(qs): self = Query(Q_AND) self.args = qs return self def door(qs): self = Query(Q_OR) self.args = qs return self # query term string def dotstring(q): self = Query(Q_TSTRING) self.string = q return self # whole query string def doqstring(q): self = Query(Q_QSTRING) self.string = q return self # end class Query class Order: def __init__(self, field, rev = False): self.field = field self.rev = rev def dooldest(ign): return Order("number") def donewest(ign): return Order("number", True) def dostaleness(ign): return Order("modified_date", True) def dofield(field): return Order(field) def dorevfield(field): return Order(field, True) # end class Order class Search: def __init__(self, qs, openonly, publiconly, os): self.queries = qs self.openonly = openonly self.publiconly = publiconly self.orders = os # end class Search class Selection: S_PR = 1 S_MESSAGE = 2 S_ATTACHMENT = 3 def __init__(self, type): self.type = type def dopr(output, outformat): self = Selection(S_PR) self.output = output self.outformat = outformat return self def domessage(arg): self = Selection(S_MESSAGE) self.message = arg return self def doattachment(arg): self = Selection(S_ATTACHMENT) self.attachment = arg return self # end class Selection class Op: # operation codes OP_FIELDS = 1 OP_SHOW = 2 OP_RANGE = 3 OP_SEARCH = 4 def __init__(self, type): self.type = type def dofields(): return Op(OP_FIELDS) def doshow(field): self = Op(OP_SHOW) self.field = field return self def dorange(field): self = Op(OP_RANGE) self.field = field return self def dosearch(s, sels): self = Op(OP_SEARCH) self.search = s self.sels = sels return self # end class Op def __init__(self, ops): self.ops = ops # end class Invocation ############################################################ # run (eval the SQL and print the results) def run_sel(sel, ids): if sel.type == S_PR: if ids == []: sys.stderr.write("No PRs matched.\n") exit(1) print_prs(ids) elif sel.type == S_MESSAGE: if len(ids) != 1: sys.stderr.write("Cannot retrieve messages " + "from multiple PRs.") exit(1) print_message(ids[0], sel.message) elif sel.type == S_ATTACHMENT: if len(ids) != 1: sys.stderr.write("Cannot retrieve attachments " + "from multiple PRs.") exit(1) print_message(ids[0], sel.attachment) else: assert(False) def run_op(op): if op.type == OP_FIELDS: list_fields() elif op.type == OP_SHOW: describe_field(op.field) elif op.type == OP_RANGE: print_field_range(op.field) elif op.type == OP_SEARCH: sql = op.search args = op.args # XXX not there! ids = querydb(op.search, args) for s in op.sels: run_sel(s, ids) else: assert(False) def run(ast): for op in ast.ops: run_op(op) ############################################################ # compile (convert the AST so the searches are pure SQL) # # XXX this doesn't work, we need to keep the interned strings # on return from compile_query. # def matches(s, rx): # XXX return True def compile_query(q): if q.type == Q_QSTRING: # XXX should use a split that honors quotes terms = q.string.split() terms = [dotstring(t) for t in terms] return compile_query(doand(terms)) if q.type == Q_TSTRING: qb = QueryBuilder() s = q.string if matches(s, "^[0-9]+$"): f = qb.getfield("number") # Note: s is user-supplied but clean to insert directly qb.addwhere("%s = %s" % (f, s)) elif matches(s, "^[0-9]+-[0-9]+$"): f = qb.getfield("number") ss = s.split("-") # Note: ss[] is user-supplied but clean qb.addwhere("%s >= %s" % (f, ss[0])) qb.addwhere("%s <= %s" % (f, ss[1])) elif matches(s, "^[0-9]+-$"): f = qb.getfield("number") ss = s.split("-") # Note: ss[] is user-supplied but clean qb.addwhere("%s >= %s" % (f, ss[0])) elif matches(s, "^-[0-9]+$"): f = qb.getfield("number") ss = s.split("-") # Note: ss[] is user-supplied but clean qb.addwhere("%s <= %s" % (f, ss[1])) elif matches(s, "^[^:]+:[^:]+$"): # XXX honor quoted terms # XXX = or LIKE? ss = s.split(":") # ss[0] is not clean but if it's crap it won't match f = qb.getfield(ss[0]) # ss[1] is not clean, so intern it for safety s = qb.intern(ss[1]) qb.addwhere("%s = %s" % (f, s)) elif matches(s, "^-[^:]+:[^:]+$"): # XXX honor quoted terms # XXX <> or NOT LIKE? ss = s.split(":") # ss[0] is not clean but if it's crap it won't match f = qb.getfield(ss[0]) # ss[1] is not clean, so intern it for safety s = qb.intern(ss[1]) qb.addwhere("%s <> %s" % (f, s)) elif matches(s, "^-"): # XXX <> or NOT LIKE? f = qb.getfield("alltext") # s is not clean, so intern it for safety s = qb.intern(s) qb.addwhere("%s <> %s" % (f, s)) else: # XXX = or LIKE? f = qb.getfield("alltext") # s is not clean, so intern it for safety s = qb.intern(s) qb.addwhere("%s = %s" % (f, s)) # XXX also does not handle: # # field: with no string (supposed to use a default # search string) # # generated search fields that parse dates: # {arrived,closed,modified,etc.}-{before,after}:date # # stale:time return qb.build("PRs.id") # end Q_TSTRING case if q.type == Q_OR: subqueries = ["(" + compile_query(sq) + ")" for sq in q.args] return " UNION ".join(subqueries) if q.type == Q_AND: subqueries = ["(" + compile_query(sq) + ")" for sq in q.args] return " INTERSECT ".join(subqueries) if q.type == Q_SQL: return q.sql assert(False) # end compile_query def compile_order(qb, o): str = qb.getfield(o.field) if o.rev: str = str + " DESCENDING" return str def compile_search(s): qb2 = QueryBuilder() # multiple query strings are treated as OR query = door(s.queries) query = compile_query(q) if s.openonly: qb2.addwhere("not %s" % qb.getfield("closed")) if s.publiconly: qb2.addwhere("not %s" % qb.getfield("confidential")) orders = [compile_order(qb2, o) for o in s.orders] order = ", ".join(orders) if order != "": qb2.setorder(order) if qb2.nonempty(): qb2.addjoin(query, "search") qb2.addjoin("PRs") qb2.addwhere("search = PRs.id") query = qb2.build(["search"]) return query # end compile_search def compile_op(op): if op.type == OP_SEARCH: op.search = compile_search(op.search) return op def compile(ast): ast.ops = [compile_op(op) for op in ast.ops] ############################################################ # arg handling # # I swear, all getopt interfaces suck. # # Provide an argparse action for constructing something out of the # argument value and appending that somewhere, since it can't do this # on its own. # # The way you use this: # p.add_argument("--foo", action = CtorAppend, dest = 'mylist', # const = ctor) # where ctor is a function taking the option arg(s) and producing # a value to append to mylist. # # This itself is mangy even for what it is -- it seems like we should # be able to pass action=CtorAppend(ctor), but since it has to be a # class that doesn't work... unless you make a new class for every # ctor you want to use, which seems completely insane. # class CtorAppend(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest) item = self.const(values) items.append(item) setattr(namespace, self.dest, items) def getargs(): p = argparse.ArgumentParser(program_description) # note: -h/--help is built in by default p.add_argument("-v", "--version", action='version', version=program_version, help="Print program version and exit") p.add_argument("--show", nargs=1, action=CtorAppend, dest='ops', const=Invocation.Op.doshow, help="Show description of field") p.add_argument("--range", nargs=1, action=CtorAppend, dest='ops', const=Invocation.Op.dorange, help="Show range of extant values for field") p.add_argument("--search", nargs=1, action=CtorAppend, dest='queries', const=Invocation.Query.doqstring, help="Force string to be read as a search string") p.add_argument("-s", "--sql", nargs=1, action=CtorAppend, dest='queries', const=Invocation.Query.dosql, help="Supply explicit sql query as search") p.add_argument("--open", action='store_const', dest='openonly', const="True", help="Exclude closed PRs (default)") p.add_argument("--closed", action='store_const', dest='openonly', const="False", help="Include closed PRs in search") p.add_argument("--public", action='store_const', dest='publiconly', const="True", help="Exclude confidential PRs") p.add_argument("--privileged", action='store_const', dest='publiconly', const="False", help="Allow confidential PRs (default)") p.add_argument("--oldest", action=CtorAppend, dest='orders', const=Invocation.Order.dooldest, help="Sort output with oldest PRs first") p.add_argument("--newest", action=CtorAppend, dest='orders', const=Invocation.Order.donewest, help="Sort output with newest PRs first") p.add_argument("--staleness", action=CtorAppend, dest='orders', const=Invocation.Order.dostaleness, help="Sort output by time since last modification") p.add_argument("--orderby", nargs=1, action=CtorAppend, dest='orders', const=Invocation.Order.dofield, help="Sort output by specific field") p.add_argument("--revorderby", nargs=1, action=CtorAppend, dest='orders', const=Invocation.Order.dorevfield, help="Sort output by specific field, reversed") p.add_argument("-m", "--message", nargs=1, action=CtorAppend, dest='selections', const=Invocation.Selection.domessage, help="Print selected message (single PR only)") p.add_argument("-a", "--attachment", nargs=1, action=CtorAppend, dest='selections', const=Invocation.Selection.doattachment, help="Print selected attachment (single PR only)") p.add_argument("-r", "--raw", action = 'store_const', const="RAW", dest = 'output', help="Print exactly what the database returns") p.add_argument("-l", "--list", action = 'store_const', const="LIST", dest = 'output', help="Print in list form (default)") p.add_argument("--headers", action = 'store_const', const="HEADERS", dest = 'output', help="Print header information only") p.add_argument("--meta", action = 'store_const', const="META", dest = 'output', help="Print all metadata") p.add_argument("--metadata", action = 'store_const', const="META", dest = 'output') p.add_argument("-f", "--full", action = 'store_const', const="FULL", dest = 'output', help="Print everything") p.add_argument("--text", action = 'store_const', const="TEXT", dest = 'outformat', help="Print in text format (default)") p.add_argument("--csv", action = 'store_const', const="CSV", dest = 'outformat', help="Print a CSV file") p.add_argument("--xml", action = 'store_const', const="XML", dest = 'outformat', help="Print in XML") p.add_argument("--json", action = 'store_const', const="JSON", dest = 'outformat', help="Print in JSON") p.add_argument("--rdf", action = 'store_const', const="RDF", dest = 'outbformat', help="Print in RDF") p.add_argument("--rdflike", action = 'store_const', const="RDFLIKE", dest = 'outformat', help="Print RDF-like text") p.add_argument("TERM", nargs='*', action=CtorAppend, dest='queries', const=Invocation.Query.doqstring, help="Search term") args = p.parse_args() ops = args.ops if ops is None: ops = [] queries = args.queries if queries is not None: openonly = args.openonly if openonly is None: openonly = True publiconly = args.publiconly if publiconly is None: publiconly = False orders = args.orders if orders is None: orders = [Invocation.Order.dooldest(None)] output = args.output if output is None: output = "LIST" outformat = args.outformat if outformat is None: outformat = "TEXT" selections = args.selections if selections is None: sel = Invocation.Selection.dopr(output, outformat) selections = [sel] search = Search(queries, openonly, publiconly, orders) op = dosearch(search, selections) ops.append(op) # endif return Invocation(ops) # end getargs ############################################################ # main todo = getargs() opendb() fetch_classifications() todo = compile(todo) run(todo) closedb() exit(0)