Mercurial > ~dholland > hg > swallowtail > index.cgi
view shelltools/query-pr/query.py @ 46:73e6dac29391
new stuff (checkpoint when moved between machines)
author | David A. Holland |
---|---|
date | Tue, 12 Aug 2014 21:55:08 -0400 |
parents | 298c8a7f5181 |
children | bcd1d06838fd |
line wrap: on
line source
#!@PYTHON@ import argparse import psycopg2 program_description = """ Search for and retrieve problem reports. """ program_version = "@VERSION@" ############################################################ # settings outfile = sys.stdout outmaterial = "headers" outformat = "text" ############################################################ # database dblink = None def opendb(): global dblink host = "localhost" user = "swallowtail" database = "swallowtail" dblink = psycopg2.connect("host=%s user=%s dbname=%s" % (host, user, database)) # end opendb def closedb(): global dblink dblink.close() dblink = None # end closedb def querydb(qtext, args): print "Executing this query:" print qtext print "Args are:" print args cursor = dblink.cursor() cursor.execute(qtext, args) result = cursor.fetchall() cursor.close() return result # end querydb ############################################################ # query class for searches class Query: __init__(self): self.selections = [] self.tables = [] self.constraints = [] self.args = [] prtables = ["PRs"] prconstraints = [] def select(self, s): self.selections.append(s) def addtable(self, t): assert(t not in self.tables) self.tables.append(t) def constrain(self, expr): self.constraints.append(t) def internval(self, val): num = len(self.args) self.args[num] = val return "$%d" % num def textify(self): s = "SELECT %s\n" % ",".join(self.selections) f = "FROM %s\n" % ",".join(self.tables) w = "WHERE %s\n" % " AND ".join(self.constraints) return s + f + w # end class Query def regexp_constraint(q, field, value): cleanval = q.internval(value) if not isregexp(value): return "%s = %s" % (field, cleanval) else: # XXX what's the right operator again? return "%s ~= %s" % (field, cleanval) # end regexp_constraint def intrange_constraint(q, field, value): (lower, upper) = args.number if lower is not None: assert(typeof(lower) == int) prq.constrain("%s >= %d" % (field, lower)) if upper is not None: assert(typeof(upper) == int) prq.constrain("%s <= %d" % (field, upper)) # end intrange_constraint def daterange_constraint(q, field, value): # XXX assert(0) # end daterange_constraint ############################################################ # If we're doing something other than a search, do it now if args.attach is not None: get_attachment(args.attach) exit(0) if args.message is not None: get_message(args.message) exit(0) if args.prs is not None and len(args.prs) > 0: show_prs(args.prs) exit(0) # # Collect up the search constraints # # 1. Constraints on the PRs table checkprtable = False prq = Query() prq.select("PRs.id as id") prq.addtable("PRs") if not args.closed: checkprtable = True prq.addtable("states") prq.constrain("PRs.state = states.name") prq.constrain("states.closed = FALSE") if args.public: checkprtable = True prq.constrain("NOT PRs.confidential") if args.number is not None: checkprtable = True intrange_constraint(prq, "PRs.id", args.number) if args.synopsis is not None: checkprtable = True regexp_constraint(prq, "PRs.synopsis", args.synopsis) if args.confidential is not None: checkprtable = True assert(typeof(args.confidential) == bool) if args.confidential: prq.constrain("PRs.confidential") else: prq.constrain("not PRs.confidential") if args.state is not None: checkprtable = True regexp_constraint(prq, "PRs.state", args.state) if args.locked is not None: checkprtable = True assert(typeof(args.locked) == bool) if args.locked: prq.constrain("PRs.locked") else: prq.constrain("not PRs.locked") if args.arrival_schemaversion is not None: checkprtable = True intrange_constraint(prq, "PRs.arrival_schemaversion", args.arrival_schemaversion) if args.arrival_date is not None: checkprtable = True daterange_constraint(prq, "PRs.arrival_date", args.arrival_date) if args.closed_date is not None: checkprtable = True daterange_constraint(prq, "PRs.closed_date", args.closed_date) if args.last_modified is not None: checkprtable = True daterange_constraint(prq, "PRs.last_modified", args.last_modified) if args.release is not None: checkprtable = True regexp_constraint(prq, "PRs.release", args.release) if args.environment is not None: checkprtable = True regexp_constraint(prq, "PRs.environment", args.environment) if args.originator_name is not None or args.originator_email is not None: prq.addtable("usermail as originator") prq.constrain("PRs.originator = originator.id") if args.originator_name is not None: checkprtable = True regexp_constraint(prq, "originator.realname", args.originator_name) if args.originator_email is not None: checkprtable = True regexp_constraint(prq, "originator.email", args.originator_name) if args.originator_id is not None: checkprtable = True intrange_constraint(prq, "PRs.originator", args.originator_id) queries = [] if checkprtable: queries.append(prq) if args.responsible is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.responsible) sq.constrain("subscriptions.responsible") queries.append(sq) if args.respondent is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users as subscribed") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.respondent) sq.constrain("subscriptions.reporter") queries.append(sq) if args.subscribed is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users as subscribed") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.subscribed) queries.append(sq) if args.messages is not None: mq = Query() mq.select("messages.pr as id") mq.addtable("messages") regexp_constraint(sq, "messages.text", args.messages) queries.append(mq) if args.adminlog is not None: aq = Query() aq.select("adminlog.pr as id") aq.addtable("adminlog") regexp_constraint(sq, "adminlog.change", args.adminlog) regexp_constraint(sq, "adminlog.comment", args.adminlog) assert(len(aq.constraints) == 2) x = "%s OR %s" % (aq.constraints[0], aq.constraints[1]) aq.constraints = [x] queries.append(aq) if args.anytext is not None: choke("--anytext isn't supported yet") for scheme in classification_schemes: if args[scheme] is not None: schemetype = classification_schemetypes[scheme] tbl = "%sclass_data" % schemetype cq = Query() cq.select("scheme.pr as id") cq.addtable("%s as scheme" % schemetype) cq.constrain("scheme.scheme = '%s'" % scheme) regexp_constraint(cq, "scheme.value", args[scheme]) queries.append(cq) # end loop querytexts = [q.textify() for q in queries] return "INTERSECT\n".join(querytexts) ############################################################ # arg handling class Invocation: class Op: # operation codes OP_FIELDS = 1 OP_SHOW = 2 OP_RANGE = 3 OP_SEARCH = 4 def __init__(self, op): self.op = op def dofields(): return Op(OP_FIELDS) def doshow(field): self = Op(OP_SHOW) self.field = field return self def dorange(field): self = Op(OP_RANGE) self.field = field return self def doquery( # output formats FMT_TXT = 1 def __init__(self): self.op = OP_SEARCH self.searchstrings = [] self.searchsql = [] # end Invocation def getargs(): p = argparse.ArgumentParser(program_description) # note: -h/--help is built in by default p.add_argument("-v", "--version", action='version', version=program_version, help="Print program version and exit") p.add_argument("--show", nargs=1, action = 'store', dest='show', help="Show description of field") p.add_argument("--range", nargs=1, action = 'store', dest='range', help="Show range of extant values for field") p.add_argument("--search", nargs=1, action = 'store', dest='search', help="Force string to be read as a search string") p.add_argument("-s", "--sql", nargs=1, action = 'store', dest='sql', help="Supply explicit sql query as search") p.add_argument("--open", action = 'store_const', const="True", dest = 'openonly', help="Exclude closed PRs (default)") p.add_argument("--closed", action = 'store_const', const="False", dest = 'openonly', help="Include closed PRs in search") p.add_argument("--public", action = 'store_const', const="True", dest = 'publiconly', help="Exclude confidential PRs") p.add_argument("--privileged", action = 'store_const', const="False", dest = 'publiconly', help="Allow confidential PRs (default)") p.add_argument("--oldest", action = 'store_const', const="OLDEST", dest = 'order', help="Sort output with oldest PRs first") p.add_argument("--newest", action = 'store_const', const="NEWEST", dest = 'order', help="Sort output with newest PRs first") p.add_argument("--staleness", action = 'store_const', const="STALENESS", dest = 'order', help="Sort output by time since last modification") p.add_argument("--orderby", nargs=1, action = 'store', dest = 'orderfield', help="Sort output by specific field") p.add_argument("--revorderby", nargs=1, action = 'store', dest = 'revorderfield', help="Sort output by specific field, reversed") p.add_argument("-m", "--message", nargs=1, action = 'store', dest = 'message', help="Print selected message (single PR only)") p.add_argument("-a", "--attachment", nargs=1, action = 'store', dest = 'attachment', help="Print selected attachment (single PR only)") p.add_argument("-r", "--raw", action = 'store_const', const="RAW", dest = 'output', help="Print exactly what the database returns") p.add_argument("-l", "--list", action = 'store_const', const="LIST", dest = 'output', help="Print in list form (default)") p.add_argument("--headers", action = 'store_const', const="HEADERS", dest = 'output', help="Print header information only") p.add_argument("--meta", action = 'store_const', const="META", dest = 'output', help="Print all metadata") p.add_argument("--metadata", action = 'store_const', const="META", dest = 'output') p.add_argument("-f", "--full", action = 'store_const', const="FULL", dest = 'output', help="Print everything") p.add_argument("--text", action = 'store_const', const="TEXT", dest = 'outformat', help="Print in text format (default)") p.add_argument("--csv", action = 'store_const', const="CSV", dest = 'outformat', help="Print a CSV file") p.add_argument("--xml", action = 'store_const', const="XML", dest = 'outformat', help="Print in XML") p.add_argument("--json", action = 'store_const', const="JSON", dest = 'outformat', help="Print in JSON") p.add_argument("--rdf", action = 'store_const', const="RDF", dest = 'outbformat', help="Print in RDF") p.add_argument("--rdflike", action = 'store_const', const="RDFLIKE", dest = 'outformat', help="Print RDF-like text") args = p.parse_args() p = Invocation() if args.show is not None: do_showfield(args.show) exit(0) if args.range is not None: do_fieldrange(args.range) exit(0) searchstring = args.search explicitsql = args.sql openonly = args.openonly if openonly is None: openonly = True publiconly = args.publiconly if publiconly is None: publiconly = False if args.orderfield is not None: orderby = args.orderfield orderrev = False elif args.revorderfield is not None: orderby = args.revorderfield orderrev = True elif args.order == "OLDEST": orderby = "number" orderrev = False elif args.order == "NEWEST": orderby = "number" orderrev = True elif args.order == "STALENESS": orderby = "last-modified" orderrev = True else: orderby = "number" orderrev = False if args.message is not None: printwhat = "MESSAGE" printwhich = args.message elif args.attachment is not None: printwhat = "ATTACHMENT" printwhich = args.attachment else: printwhat = "PR" printwhich = None output = args.output if output is None: output = "LIST" outformat = args.outformat if outformat is None: outformat = "TEXT" query = buildquery(searchstring, explicitsql, orderby=orderby, orderrev=orderrev, openonly=openonly, publiconly=publiconly) if printwhat == "PR": printer = buildprinter(output, outformat) else if printwhat == "MESSAGE": printer = getmessage(printwhich) else if printwhat == "ATTACHMENT": printer = getattachment(printwhich) return (query, printer) # end getargs ############################################################ # main def main(): opendb() (classification_schemes, classification_schemetypes) = getclassify() query = getargs(classification_schemes, classification_schemetypes) ids = querydb(query) if len(ids) > 0: show_prs(ids) else: sys.stderr.write("No PRs matched.\n") exit(1) closedb() return 0 # end main # real python hackers doubtless laugh at this exit(main())