-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathquery.py
More file actions
111 lines (89 loc) · 2.99 KB
/
query.py
File metadata and controls
111 lines (89 loc) · 2.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Code generated by sqlc. DO NOT EDIT.
# versions:
# sqlc v1.30.0
# source: query.sql
from typing import AsyncIterator, Iterator, Optional
import sqlalchemy
import sqlalchemy.ext.asyncio
from db import models
CREATE_BOOK = """-- name: create_book \\:one
INSERT INTO books (
title, status
) VALUES (
:p1, :p2
) RETURNING id, title, status
"""
DELETE_BOOK = """-- name: delete_book \\:exec
DELETE FROM books
WHERE id = :p1
"""
GET_BOOK = """-- name: get_book \\:one
SELECT id, title, status FROM books
WHERE id = :p1 LIMIT 1
"""
LIST_BOOKS = """-- name: list_books \\:many
SELECT id, title, status FROM books
ORDER BY title
"""
class Querier:
def __init__(self, conn: sqlalchemy.engine.Connection):
self._conn = conn
def create_book(self, *, title: str, status: Optional[models.BookStatus]) -> Optional[models.Book]:
row = self._conn.execute(sqlalchemy.text(CREATE_BOOK), {"p1": title, "p2": status}).first()
if row is None:
return None
return models.Book(
id=row[0],
title=row[1],
status=row[2],
)
def delete_book(self, *, id: int) -> None:
self._conn.execute(sqlalchemy.text(DELETE_BOOK), {"p1": id})
def get_book(self, *, id: int) -> Optional[models.Book]:
row = self._conn.execute(sqlalchemy.text(GET_BOOK), {"p1": id}).first()
if row is None:
return None
return models.Book(
id=row[0],
title=row[1],
status=row[2],
)
def list_books(self) -> Iterator[models.Book]:
result = self._conn.execute(sqlalchemy.text(LIST_BOOKS))
for row in result:
yield models.Book(
id=row[0],
title=row[1],
status=row[2],
)
class AsyncQuerier:
def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection):
self._conn = conn
async def create_book(self, *, title: str, status: Optional[models.BookStatus]) -> Optional[models.Book]:
row = (await self._conn.execute(sqlalchemy.text(CREATE_BOOK), {"p1": title, "p2": status})).first()
if row is None:
return None
return models.Book(
id=row[0],
title=row[1],
status=row[2],
)
async def delete_book(self, *, id: int) -> None:
await self._conn.execute(sqlalchemy.text(DELETE_BOOK), {"p1": id})
async def get_book(self, *, id: int) -> Optional[models.Book]:
row = (await self._conn.execute(sqlalchemy.text(GET_BOOK), {"p1": id})).first()
if row is None:
return None
return models.Book(
id=row[0],
title=row[1],
status=row[2],
)
async def list_books(self) -> AsyncIterator[models.Book]:
result = await self._conn.stream(sqlalchemy.text(LIST_BOOKS))
async for row in result:
yield models.Book(
id=row[0],
title=row[1],
status=row[2],
)