Mercurial > ~dholland > hg > swallowtail > index.cgi
view shelltools/query-pr/query.py @ 34:ceecdbbb1364
some updates
author | David A. Holland |
---|---|
date | Mon, 27 May 2013 23:19:00 -0400 |
parents | 298c8a7f5181 |
children | 73e6dac29391 |
line wrap: on
line source
#!@PYTHON@ import argparse import psycopg2 program_description = """ Search for and retrieve problem reports. """ program_version = "@VERSION@" ############################################################ # settings outfile = sys.stdout outmaterial = "headers" outformat = "text" ############################################################ # database dblink = None def opendb(): global dblink host = "localhost" user = "swallowtail" database = "swallowtail" dblink = psycopg2.connect("host=%s user=%s dbname=%s" % (host, user, database)) # end opendb def closedb(): global dblink dblink.close() dblink = None # end closedb def querydb(qtext, args): print "Executing this query:" print qtext print "Args are:" print args cursor = dblink.cursor() cursor.execute(qtext, args) result = cursor.fetchall() cursor.close() return result # end querydb ############################################################ # ############################################################ # query class for searches class Query: __init__(self): self.selections = [] self.tables = [] self.constraints = [] self.args = [] prtables = ["PRs"] prconstraints = [] def select(self, s): self.selections.append(s) def addtable(self, t): assert(t not in self.tables) self.tables.append(t) def constrain(self, expr): self.constraints.append(t) def internval(self, val): num = len(self.args) self.args[num] = val return "$%d" % num def textify(self): s = "SELECT %s\n" % ",".join(self.selections) f = "FROM %s\n" % ",".join(self.tables) w = "WHERE %s\n" % " AND ".join(self.constraints) return s + f + w # end class Query def regexp_constraint(q, field, value): cleanval = q.internval(value) if not isregexp(value): return "%s = %s" % (field, cleanval) else: # XXX what's the right operator again? return "%s ~= %s" % (field, cleanval) # end regexp_constraint def intrange_constraint(q, field, value): (lower, upper) = args.number if lower is not None: assert(typeof(lower) == int) prq.constrain("%s >= %d" % (field, lower)) if upper is not None: assert(typeof(upper) == int) prq.constrain("%s <= %d" % (field, upper)) # end intrange_constraint def daterange_constraint(q, field, value): # XXX assert(0) # end daterange_constraint ############################################################ # arg handling def getargs(classification_schemes, classification_schemetypes): global outmaterial global outformat p = argparse.ArgumentParser(program_description) # note: -h/--help is built in by default p.add_argument("-v", "--version", action='version' version=program_version, help="Print program version and exit") p.add_argument("--short", action='store_const', const="short", dest='outmaterial', help="Output in short form (one per line)") p.add_argument("--headers", action='store_const', const="headers", dest='outmaterial', help="Output in default form (headers only)") p.add_argument("--full", action='store_const', const="full", dest='outmaterial', help="Output in full form (all material)") p.add_argument("--attach", nargs=1, type=int, action='store', dest='attach', help="Output specified attachment") p.add_argument("--message", nargs=1, type=int, action='store', dest='message', help="Output specified message") p.add_argument("--text", action='store_const', const="text", dest='outformat', help="Output as text (default)") p.add_argument("--csv", action='store_const', const="csv", dest='outformat', help="Output as comma-separated values") p.add_argument("--xml", action='store_const', const="xml", dest='outformat', help="Output as XML") p.add_argument("--json", action='store_const', const="json", dest='outformat', help="Output as JSON") p.add_argument("--rdf", action='store_const', const="rdf", dest='outformat', help="Output as RDF") p.add_argument("--rdflike", action='store_const', const="rdflike", dest='outformat', help="Output as RDF-like text") p.add_argument("--closed", action='store_true', dest='closed', default=False, help="Search closed PRs as well as open") p.add_argument("--public", action='store_true', dest='public', default=False, help="Search only public information") p.add_argument("--number", nargs=1, type=intrange, dest='number', help="Restrict search to a range of PRs by number") p.add_argument("--synopsis", nargs=1, dest='synopsis', help="Search PRs by their synopsis field") p.add_argument("--confidential", nargs=1, type=yesno, choices=["yes", "no"], dest='confidential', help="Search PRs by their confidentiality setting") p.add_argument("--state", nargs=1, dest='state', help="Search PRs by their state") p.add_argument("--locked", nargs=1, type=yesno, choices=["yes", "no"], dest='locked', help="Search PRs by their locked setting") p.add_argument("--arrival-schemaversion", nargs=1, type=intrange, dest='arrival_schemaversion', help="Search PRs by their original schema version") p.add_argument("--arrival-date", nargs=1, type=daterange, dest='arrival_date', help="Search PRs by arrival date") p.add_argument("--closed-date", nargs=1, type=daterange, dest='closed_date', help="Search PRs by the date they were closed") p.add_argument("--last-modified", nargs=1, type=daterange, dest='last_modified', help="Search PRs by the date they were last modified") p.add_argument("--release", nargs=1, dest='release', help="Search PRs by their release information") p.add_argument("--environment", nargs=1, dest='environment', help="Search PRs by their environment information") p.add_argument("--originator-name", nargs=1, dest='originator_name', help="Search PRs by the name of their originator") p.add_argument("--originator-email", nargs=1, dest='originator_email', help="Search PRs by the email of their originator") p.add_argument("--originator-id", nargs=1, type=intrange, dest='originator_id', help="Search PRs by the database ID of their originator") p.add_argument("--responsible", nargs=1, dest='responsible', help="Search PRs by who's responsible for them") p.add_argument("--respondent", nargs=1, dest='respondent', help="Search PRs by who's a respondent") p.add_argument("--subscribed", nargs=1, dest='subscribed', help="Search PRs by who's subscribed") p.add_argument("--messages", nargs=1, dest='messages', help="Search PRs by text in the message log") p.add_argument("--adminlog", nargs=1, dest='adminlog', help="Search PRs by text in the administrative log") p.add_argument("--anytext", nargs=1, dest='anytext', help="Search PRs by text in various fields") for scheme in classification_schemes: p.add_argument("--%s" % scheme, nargs=1, dest=scheme, help="Search PRs by the %s classification scheme" % scheme) p.add_argument("prs", nargs='*', metavar="PR", type=int, action='append', dest='prs', help="Specific PRs to retrieve by number") args = p.parse_args() if args.outmaterial is not None: outmaterial = args.outmaterial if args.outformat is not None: outformat = args.outformat # If we're doing something other than a search, do it now if args.attach is not None: get_attachment(args.attach) exit(0) if args.message is not None: get_message(args.message) exit(0) if args.prs is not None and len(args.prs) > 0: show_prs(args.prs) exit(0) # # Collect up the search constraints # # 1. Constraints on the PRs table checkprtable = False prq = Query() prq.select("PRs.id as id") prq.addtable("PRs") if not args.closed: checkprtable = True prq.addtable("states") prq.constrain("PRs.state = states.name") prq.constrain("states.closed = FALSE") if args.public: checkprtable = True prq.constrain("NOT PRs.confidential") if args.number is not None: checkprtable = True intrange_constraint(prq, "PRs.id", args.number) if args.synopsis is not None: checkprtable = True regexp_constraint(prq, "PRs.synopsis", args.synopsis) if args.confidential is not None: checkprtable = True assert(typeof(args.confidential) == bool) if args.confidential: prq.constrain("PRs.confidential") else: prq.constrain("not PRs.confidential") if args.state is not None: checkprtable = True regexp_constraint(prq, "PRs.state", args.state) if args.locked is not None: checkprtable = True assert(typeof(args.locked) == bool) if args.locked: prq.constrain("PRs.locked") else: prq.constrain("not PRs.locked") if args.arrival_schemaversion is not None: checkprtable = True intrange_constraint(prq, "PRs.arrival_schemaversion", args.arrival_schemaversion) if args.arrival_date is not None: checkprtable = True daterange_constraint(prq, "PRs.arrival_date", args.arrival_date) if args.closed_date is not None: checkprtable = True daterange_constraint(prq, "PRs.closed_date", args.closed_date) if args.last_modified is not None: checkprtable = True daterange_constraint(prq, "PRs.last_modified", args.last_modified) if args.release is not None: checkprtable = True regexp_constraint(prq, "PRs.release", args.release) if args.environment is not None: checkprtable = True regexp_constraint(prq, "PRs.environment", args.environment) if args.originator_name is not None or args.originator_email is not None: prq.addtable("usermail as originator") prq.constrain("PRs.originator = originator.id") if args.originator_name is not None: checkprtable = True regexp_constraint(prq, "originator.realname", args.originator_name) if args.originator_email is not None: checkprtable = True regexp_constraint(prq, "originator.email", args.originator_name) if args.originator_id is not None: checkprtable = True intrange_constraint(prq, "PRs.originator", args.originator_id) queries = [] if checkprtable: queries.append(prq) if args.responsible is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.responsible) sq.constrain("subscriptions.responsible") queries.append(sq) if args.respondent is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users as subscribed") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.respondent) sq.constrain("subscriptions.reporter") queries.append(sq) if args.subscribed is not None: sq = Query() sq.select("subscriptions.pr as id") sq.addtable("subscriptions") sq.addtable("users as subscribed") sq.constrain("subscriptions.userid = users.id") regexp_constraint(sq, "users.realname", args.subscribed) queries.append(sq) if args.messages is not None: mq = Query() mq.select("messages.pr as id") mq.addtable("messages") regexp_constraint(sq, "messages.text", args.messages) queries.append(mq) if args.adminlog is not None: aq = Query() aq.select("adminlog.pr as id") aq.addtable("adminlog") regexp_constraint(sq, "adminlog.change", args.adminlog) regexp_constraint(sq, "adminlog.comment", args.adminlog) assert(len(aq.constraints) == 2) x = "%s OR %s" % (aq.constraints[0], aq.constraints[1]) aq.constraints = [x] queries.append(aq) if args.anytext is not None: choke("--anytext isn't supported yet") for scheme in classification_schemes: if args[scheme] is not None: schemetype = classification_schemetypes[scheme] tbl = "%sclass_data" % schemetype cq = Query() cq.select("scheme.pr as id") cq.addtable("%s as scheme" % schemetype) cq.constrain("scheme.scheme = '%s'" % scheme) regexp_constraint(cq, "scheme.value", args[scheme]) queries.append(cq) # end loop querytexts = [q.textify() for q in queries] return "INTERSECT\n".join(querytexts) # end getargs ############################################################ # main def main(): opendb() (classification_schemes, classification_schemetypes) = getclassify() query = getargs(classification_schemes, classification_schemetypes) ids = querydb(query) if len(ids) > 0: show_prs(ids) else: sys.stderr.write("No PRs matched.\n") exit(1) closedb() return 0 # end main # real python hackers doubtless laugh at this exit(main())