This repository was archived by the owner on May 13, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdb.py
More file actions
executable file
·767 lines (642 loc) · 26.5 KB
/
db.py
File metadata and controls
executable file
·767 lines (642 loc) · 26.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
"""
DB-SIG compliant module for CDR database access.
"""
import platform
import unittest
import pyodbc
from cdrapi import settings
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
def connect(**opts):
"""
Connect to the CDR database using known login account.
Optional keyword arguments:
user - string for database account name (default Query.CDRSQLACCOUNT)
server - string in the form hostname,port
tier - tier name string (e.g., 'PROD') or Tier object
database - initial db for the connection (default Query.DB)
timeout - time to wait before giving up (default Query.DEFAULT_TIMEOUT)
autocommit - if True, don't wrap db writes in transactions
"""
tier = opts.get("tier") or settings.Tier()
if isinstance(tier, bytes):
tier = settings.Tier(tier.decode("utf-8"))
elif isinstance(tier, str):
tier = settings.Tier(tier)
timeout = opts.get("timeout", Query.DEFAULT_TIMEOUT)
autocommit = opts.get("autocommit", False)
user = opts.get("user", Query.CDRSQLACCOUNT)
if user == "cdr":
user = Query.CDRSQLACCOUNT
password = tier.password(user, Query.DB)
if not password:
raise Exception("user {!r} unknown on {!r}".format(user, tier.name))
server = opts.get("server") or f"{tier.sql_server},{tier.port(Query.DB)}"
parms = dict(
Driver="{ODBC Driver 17 for SQL Server}",
Server=server,
Database=opts.get("database", Query.DB),
Uid=user,
Pwd=password,
Timeout=timeout
)
if platform.system().lower() != "windows":
parms["Driver"] = "{ODBC Driver 18 for SQL Server}"
parms["Encrypt"] = "yes"
parms["TrustServerCertificate"] = "yes"
conn_string = ";".join(["{}={}".format(*p) for p in parms.items()])
if opts.get("debug"):
print(conn_string)
opts = dict(timeout=timeout, autocommit=autocommit)
conn = pyodbc.connect(conn_string, **opts)
conn.timeout = timeout
return conn
class Query:
"""
Builder for SQL select queries.
Example usage:
query = cdrdb.Query('t1 a', 'a.title', 'b.name AS "Type"')
query.join('t2 b', 'b.id = a.t2')
query.where(query.Condition('b.name', ('Foo', 'Bar'), 'IN'))
# To see the generated SQL
print(query)
# To execute and cleanup
cursor = query.execute()
rows = cursor.fetchall()
cursor.close()
# Or alternatively if closing the cursor doesn't matter
rows = query.execute().fetchall()
"""
PLACEHOLDER = "?"
DEFAULT_TIMEOUT = 120
CDRSQLACCOUNT = "cdrsqlaccount"
DB = "CDR"
def __init__(self, table, *columns):
"""
Initializes a SQL query builder
Passed:
table table name with possible alias
columns one or more column names to be selected,
qualified with alias if necessary; a column
can be an expression
"""
self._table = table
self._columns = columns
self._joins = []
self._where = []
self._group = []
self._having = []
self._order = []
self._parms = []
self._unions = []
self._timeout = self.DEFAULT_TIMEOUT
self._alias = None
self._into = None
self._cursor = None
self._limit = None
self._unique = False
self._str = None
self._outer = False
self._logger = None
def timeout(self, value):
"""
Override the default timeout of 120 seconds with a new value.
"""
self._timeout = int(value)
return self
def join(self, table, *conditions):
"""
Join to an additional table (or view)
Each condition can be a simple string (e.g., 't.id = d.doc_type')
or a more complicated Condition or Or object.
If you don't supply at least one condition, you might be
unpleasantly surprised by the results. :-)
"""
self._joins.append(Query.Join(table, False, *conditions))
self._str = None
return self
def outer(self, table, *conditions):
"""
Create a left outer join
Sets the self._outer flag so the formatter knows to add
extra left padding to the query as needed. Otherwise works
the same as the join() method.
"""
self._joins.append(Query.Join(table, True, *conditions))
self._outer = True
self._str = None
return self
def where(self, condition):
"""
Adds a condition for the query's WHERE clause
A condition can be a simple string (e.g., 't.id = d.doc_type')
or a more complicated Condition or Or object.
"""
self._where.append(condition)
self._str = None
return self
def group(self, *columns):
"""
Adds one or more columns to be used in the query's GROUP BY clause
Example usage:
query.group('d.id', 'd.title')
"""
for column in columns:
Query._add_sequence_or_value(column, self._group)
self._str = None
return self
def having(self, condition):
"""
Adds a condition to the query's HAVING clause
A condition can be a simple string (e.g., 't.id = d.doc_type')
or a more complicated Condition or Or object.
"""
self._having.append(condition)
self._str = None
return self
def union(self, query):
"""
Add a query to be UNIONed with this one.
Use this when you want to apply the ORDER BY clause to the UNIONed
queries as a whole. Make sure only this query has an ORDER set.
If you need each component query to maintain its own internal order,
construct and serialize each separately, and assemble them by hand.
Be sure you assign aliases to each of the virtual tables.
For example:
q1 = cdrdb.Query(...).join.(...).where(...).order(...).alias(...)
q2 = cdrdb.Query(...).join.(...).where(...).order(...).alias(...)
union = cdrdb.Query(q1, "*").union(cdrdb.Query(q2, "*"))
For a more straightforward use of `union()`, not involving
virtual tables, see the seventh unit test near the bottom of this
file.
"""
self._unions.append(query)
self._str = None
return self
def order(self, *columns):
"""
Add the column(s) to be used to sort the results
Example usage:
query.order('doc_type.name', 'version.dt DESC')
"""
temp = []
for column in columns:
Query._add_sequence_or_value(str(column), temp)
for column in temp:
column = column.strip()
words = column.split()
if len(words) > 2:
raise Exception("invalid order column %s" % repr(column))
if len(words) == 2 and words[1].upper() not in ("ASC", "DESC"):
raise Exception("invalid order column %s" % repr(column))
self._order.append(" ".join(words))
self._str = None
return self
def limit(self, limit):
"""
Sets maximum number of rows to return
"""
if not isinstance(limit, int):
raise Exception("limit must be integer")
self._limit = limit
self._str = None
return self
def unique(self):
"""
Requests that duplicate rows be eliminated
"""
self._unique = True
self._str = None
return self
def cursor(self, cursor):
"""
Pass in a cursor to be used for the query.
"""
self._cursor = cursor
return self
def execute(self, cursor=None, timeout=None):
"""
Assemble and execute the SQL query, returning the cursor object
As with the Miranda rule, if you do not supply a cursor,
one will be provided for you.
Note that the temporary 'sql' variable is assigned before
invoking the cursor's execute() method, to make sure that
the _parms sequence has been constructed.
"""
if not cursor:
if not timeout:
timeout = self._timeout
conn = connect(user="CdrGuest", timeout=timeout)
cursor = conn.cursor()
sql = str(self)
cursor.execute(sql, tuple(self._parms))
return cursor
def alias(self, alias):
"""
Assigns an alias for a query so that it can be used as a virtual
table as the target of a FROM clause:
SELECT xxx.this, xxx.that, yyy.other
FROM (
SELECT this, that
FROM whatever
) AS xxx
Example usage:
q1 = cdrdb.Query('whatever', 'this', 'that').alias('xxx')
q2 = cdrdb.Query(q1, 'xxx.this', 'xxx.that', 'yyy.other')
q2.join('other_table yyy', ...)
"""
self._alias = alias
self._str = None
return self
def parms(self):
"""
Accessor method for query parameters
Return the list of parameters to be passed to the database
engine for the execution of the query. Will be in the
correct order, matching the position of the corresponding
placeholders in the query string.
"""
# Make sure the parameters have been assembled.
if self._str is None:
str(self)
return self._parms
def _align(self, keyword, rest=""):
"""
Internal helper method to make the SQL query easier to read
"""
keyword = " " * self._indent + keyword
return "%s %s" % (keyword[-self._indent:], rest)
def into(self, name):
"""
Specify name of table to be created by this query
Prefix the name with the octothorpe character ('#') to create
a temporary table.
"""
self._into = name
self._str = None
return self
def log(self, **parms):
if self._logger is None:
tier = settings.Tier()
self._logger = tier.get_logger("db", level="DEBUG")
label = parms.get("label", "QUERY")
output = "%s:\n%s" % (label, self)
if self._parms:
parms = ["PARAMETERS:"] + [repr(p) for p in self._parms]
output += "\n" + "\n\t".join(parms)
self._logger.debug(output)
def __str__(self):
"""
Assemble the query for execution or logging.
The format of the query string is arranged to make reading
by a human easier. The assembled query is cached.
A side effect of a call to this method is that the sequence
of all parameters to be passed to the database engine for
execution of the query is constructed as the '_parms' member.
"""
# If our cached string is not stale, use it.
if self._str:
return self._str
# Start with a fresh paramater list.
self._parms = []
# Start the select statement, and calculate needed left padding.
select = "SELECT"
if self._unique:
select += " DISTINCT"
if self._limit is not None:
select += " TOP %d" % self._limit
self._indent = len(select)
pairs = (
(self._order, "ORDER BY"),
(self._outer, "LEFT OUTER JOIN"),
(self._group, "GROUP BY"),
)
for attribute, keywords in pairs:
if attribute:
needed = len(keywords) - self._indent
if needed > 0:
self._indent += needed
query = [self._align(select, ", ".join(self._columns))]
# Add clause to store results in a new table if requested.
if self._into:
query.append(self._align("INTO", self._into))
# Is the base table itself a query?
if isinstance(self._table, Query):
# Make sure it has an alias.
alias = self._table._alias
if not alias:
raise Exception("Virtual tables must have an alias")
# SQL Server won't accept placeholders here.
str(self._table)
if self._table._parms:
raise Exception("Placeholders not allowed in virtual table")
# Add the indented query in parentheses.
query.append(self._align("FROM", "("))
query.append(Query.indent("%s) %s" % (self._table, alias)))
# No: just a plain vanilla FROM clause.
else:
query.append(self._align("FROM", self._table))
# Add JOIN clauses for any additional tables used for the query.
for join in self._joins:
self._serialize_join(query, join)
# Add the conditions used to restrict the set of results.
keyword = "WHERE"
for condition in self._where:
self._serialize_condition(query, keyword, condition)
keyword = "AND"
# If the query uses aggregates, specify column for the grouping.
if self._group:
query.append(self._align("GROUP BY", ", ".join(self._group)))
# Specify any restrictions on the results based on aggregations.
if self._having:
keyword = "HAVING"
for condition in self._having:
self._serialize_condition(query, keyword, condition)
keyword = "AND"
# Add any queries to be spliced to this one
for union in self._unions:
query.append(self._align("UNION"))
query.append(str(union))
# Specify the sorting of the result set if requested.
if self._order:
query.append(self._align("ORDER BY", ", ".join(self._order)))
# Assemble everything and cache the results.
self._str = "\n".join(query)
# Give the caller the resulting SQL.
return self._str
def _serialize_or_set(self, query, keyword, or_set, prefix, suffix):
"""
Internal helper method for building the query string
This method has four responsibilities:
1. Wrap the set of OR conditions in properly balanced parentheses
2. Connect the conditions with the "OR" keyword
3. Hand of serialization of each condition to _serialize_condition
4. Connect nested sequences of conditions with the "AND" keyword
"""
open_paren = "(" + prefix
close_paren = ""
for i, condition in enumerate(or_set.conditions):
last_or = (i == len(or_set.conditions) - 1)
if type(condition) in (tuple, list):
for j, c in enumerate(condition):
if last_or and j == len(condition) - 1:
close_paren = suffix + ")"
self._serialize_condition(query, keyword, c,
open_paren, close_paren)
keyword = "AND"
open_paren = ""
else:
if last_or:
close_paren = suffix + ")"
self._serialize_condition(query, keyword, condition,
open_paren, close_paren)
keyword = "OR"
open_paren = ""
def _serialize_condition(self, query, keyword, condition, prefix="",
suffix=""):
"""
Internal helper method for building the query string.
"""
# Hand off the work for an Or set
if isinstance(condition, Query.Or):
self._serialize_or_set(query, keyword, condition, prefix, suffix)
return
# Handle the easy cases.
if not isinstance(condition, Query.Condition):
query.append(self._align(keyword, prefix + condition + suffix))
return
# Start the test string.
test = "%s%s %s" % (prefix, condition.column, condition.test)
# Handle a nested query.
if isinstance(condition.value, Query):
# Serialize the nested query.
nested = condition.value
alias = nested._alias and (" %s" % nested._alias) or ""
serialized = "%s)%s%s" % (nested, alias, suffix)
# Finish the condition.
query.append(self._align(keyword, test + " ("))
query.append(Query.indent(serialized))
self._parms += nested._parms
# Handle a sequence of values.
elif condition.test.upper() in ("IN", "NOT IN"):
# Make sure we have a list.
values = condition.value
if type(values) not in (list, tuple):
values = [values]
# Must have at least one value.
if not values:
raise Exception("%s test with no values" %
repr(condition.test.upper()))
# Add the placeholders.
test += " (%s)" % ", ".join([self.PLACEHOLDER] * len(values))
# Plug in the condition to the query string.
query.append(self._align(keyword, test + suffix))
# Add the parameters.
self._parms += values
# Last case: single value test.
else:
if type(condition.value) in (list, tuple):
raise Exception("Unexpected sequence of values")
query.append(self._align(keyword, "%s %s%s" % (test,
self.PLACEHOLDER,
suffix)))
self._parms.append(condition.value)
def _serialize_join(self, query, join):
"""
Helper function for building the query string.
"""
keyword = join.outer and "LEFT OUTER JOIN" or "JOIN"
# Is this 'table' being constructed on the fly?
if isinstance(join.table, Query):
# Make sure it has been provided with an alias.
alias = join.table._alias
if not alias:
raise Exception("resultset expression without alias")
# SQL Server won't accept placeholders here.
if join.table.parms():
raise Exception("Placeholders not allowed in joined "
"resultset expression")
# Add the table expression indented and in parentheses.
query.append(self._align(keyword, "("))
query.append(Query.indent("%s) %s" % (join.table, alias)))
# No, just a named table.
else:
query.append(self._align(keyword, join.table))
# Add the conditions for the join.
keyword = "ON"
for condition in join.conditions:
self._serialize_condition(query, keyword, condition)
keyword = "AND"
@staticmethod
def indent(block, n=4):
"""
Indent a block containing one or more lines by a number of spaces
"""
if isinstance(block, Query):
block = str(block)
padding = " " * n
end = block.endswith("\n") and "\n" or ""
lines = block.splitlines()
return "\n".join(["%s%s" % (padding, line) for line in lines]) + end
@staticmethod
def _add_sequence_or_value(to_be_added, collection):
if isinstance(to_be_added, list):
collection += to_be_added
elif isinstance(to_be_added, tuple):
collection += list(to_be_added)
else:
collection.append(to_be_added)
class Condition:
"""
Test of a value (typically, but not necessarily a column; could
also be an expression, or even a constant value), against a
second value (which can be a single value, or a query which
returns a single value, or a sequence of values in the case
of an "IN" or "NOT IN" test).
"""
def __init__(self, col, val, test="="):
self.column = col
self.value = val
self.test = test
C = Condition
class Or:
"""
Represents a set of one or more conditions the satisfaction of
any one of which will be considered as satisfying the entire
set.
Simple example:
query = cdrdb.Query('t1', 'c1', 'c2')
first_test = 'c1 < 42'
second_test = query.Condition('c2', get_some_values(), 'IN')
query.where(query.Or(first_test, second_test))
"""
def __init__(self, *conditions):
"""
Accepts one or more conditions, each of which can be either
a string containing a SQL expression, or a Query.Condition
object. Any argument can also be a sequence of SQL expressions
and/or Query.Condition or Query.Or objects, which will all be
ANDed together as a single unit to be ORed against the tests
represented by the other arguments to the constructor. There
is no limit (other than that imposed by the computing resources
on the client and server machines) to the level of nesting
supported for combinations of AND and OR condition sets.
"""
self.conditions = conditions
class Join:
"""
Used internally to represent a SQL JOIN clause
"""
def __init__(self, table, outer, *conditions):
self.table = table
self.outer = outer
self.conditions = []
for condition in conditions:
Query._add_sequence_or_value(condition, self.conditions)
class QueryTests(unittest.TestCase):
"""
Run tests to check the health of the Query class.
"""
# Convenience aliases
Q = Query
C = Query.Condition
@staticmethod
def V(rows):
"""Extract values from rows"""
return [tuple([c for c in r]) for r in rows]
@staticmethod
def D(rows, cursor):
"""Get values as a dictionary"""
n = [d[0] for d in cursor.description]
return [dict([(n[i], v) for i, v in enumerate(row)]) for row in rows]
def setUp(self):
"""
Create some test tables.
"""
self.c = connect(user="CdrGuest").cursor()
self.c.execute("CREATE TABLE #t1 (i INT, n VARCHAR(32))")
self.c.execute("CREATE TABLE #t2 (i INT, n VARCHAR(32))")
self.c.execute("INSERT INTO #t1 VALUES(42, 'Alan')")
self.c.execute("INSERT INTO #t1 VALUES(43, 'Bob')")
self.c.execute("INSERT INTO #t1 VALUES(44, 'Volker')")
self.c.execute("INSERT INTO #t1 VALUES(45, 'Elmer')")
self.c.execute("INSERT INTO #t2 VALUES(42, 'biology')")
self.c.execute("INSERT INTO #t2 VALUES(42, 'aviation')")
self.c.execute("INSERT INTO #t2 VALUES(42, 'history')")
self.c.execute("INSERT INTO #t2 VALUES(43, 'music')")
self.c.execute("INSERT INTO #t2 VALUES(43, 'cycling')")
self.c.execute("INSERT INTO #t2 VALUES(44, 'physics')")
self.c.execute("INSERT INTO #t2 VALUES(44, 'volleyball')")
self.c.execute("INSERT INTO #t2 VALUES(44, 'tennis')")
def test_01_order_by_with_top(self):
q = self.Q("#t1", "i").limit(1).order("1 DESC")
r = self.V(q.execute(self.c, timeout=10).fetchall())
self.assertTrue(r == [(45,)])
def test_02_join_with_count(self):
q = self.Q("#t1", "COUNT(DISTINCT #t1.i)").join("#t2", "#t2.i = #t1.i")
r = self.V(q.execute(self.c).fetchall())
self.assertTrue(r == [(3,)])
def test_03_group_by_and_having(self):
q = self.Q("#t2", "i", "COUNT(*)").group("i").having("COUNT(*) > 2")
r = {row[0] for row in q.execute(self.c).fetchall()}
self.assertTrue(r == {42, 44})
def test_04_left_outer_join_with_is_null(self):
q = self.Q("#t1 a", "a.i", "b.n").outer("#t2 b", "b.i = a.i")
r = self.V(q.where("b.n IS NULL").execute(self.c).fetchall())
self.assertTrue(r == [(45, None,)])
def test_05_nested_ors_and_ands(self):
q = self.Q("#t1 a", "a.n").join("#t2 b", "b.i = a.i").unique()
q.where(self.Q.Or("a.n LIKE 'E%'", ("a.i < 44", "b.n LIKE '%o%'")))
q.where("a.n <> 'Volker'")
r = self.V(q.execute(self.c).fetchall())
self.assertTrue(r == [('Alan',)])
def test_06_condition_object_with_placeholders(self):
v = ('biology', 'physics')
q = self.Q("#t1 a", "a.n").join("#t2 b", "b.i = a.i").unique().order(1)
q.where(self.C("b.n", v, "IN"))
q.timeout(5)
r = [row[0] for row in q.execute(self.c).fetchall()]
self.assertTrue(r == ['Alan', 'Volker'])
def test_07_union(self):
q = self.Q("#t1", "n").where("i > 44")
q.union(self.Q("#t1", "n").where("i < 43"))
r = [r[0] for r in q.order(1).execute(self.c).fetchall()]
self.assertTrue(r == ["Alan", "Elmer"])
def test_08_into(self):
self.Q("#t1", "*").into("#t3").execute(self.c)
q = self.Q("#t3", "n").order(1)
r = [r[0] for r in q.execute(self.c).fetchall()]
self.assertTrue(r == ["Alan", "Bob", "Elmer", "Volker"])
def test_09_nested_query(self):
q = self.Q("#t1", "n")
q.where(self.C("i", self.Q("#t2", "i").unique(), "NOT IN"))
r = self.V(q.execute(self.c).fetchall())
self.assertTrue(r == [("Elmer",)])
def test_10_dictionary_results(self):
c = connect(user="CdrGuest").cursor()
c.execute("CREATE TABLE #t1 (i INT, n VARCHAR(32))")
c.execute("CREATE TABLE #t2 (i INT, n VARCHAR(32))")
c.execute("INSERT INTO #t1 VALUES(42, 'Alan')")
c.execute("INSERT INTO #t1 VALUES(43, 'Bob')")
c.execute("INSERT INTO #t1 VALUES(44, 'Volker')")
c.execute("INSERT INTO #t1 VALUES(45, 'Elmer')")
c.execute("INSERT INTO #t2 VALUES(42, 'biology')")
c.execute("INSERT INTO #t2 VALUES(42, 'aviation')")
c.execute("INSERT INTO #t2 VALUES(42, 'history')")
c.execute("INSERT INTO #t2 VALUES(43, 'music')")
c.execute("INSERT INTO #t2 VALUES(43, 'cycling')")
c.execute("INSERT INTO #t2 VALUES(44, 'physics')")
c.execute("INSERT INTO #t2 VALUES(44, 'volleyball')")
c.execute("INSERT INTO #t2 VALUES(44, 'tennis')")
q = self.Q("#t1", "n")
q.where(self.C("i", self.Q("#t2", "i").unique(), "NOT IN"))
r = self.D(q.execute(c).fetchall(), c)
self.assertTrue(r == [{"n": "Elmer"}])
def test_11_union_with_virtual_queries(self):
q1 = self.Q("#t2", "n").limit(1).alias("q1").order("n")
q2 = self.Q("#t2", "n").limit(1).alias("q2").order("n DESC")
u = self.Q(q1, "*").union(self.Q(q2, "*"))
r = [r[0] for r in u.execute(self.c).fetchall()]
self.assertTrue(r == ["aviation", "volleyball"])
if __name__ == "__main__":
unittest.main()