Skip to content

Commit 2fd0aed

Browse files
authored
add type hints and more inline pydoc (#5)
1 parent f8ef1e3 commit 2fd0aed

4 files changed

Lines changed: 182 additions & 32 deletions

File tree

verdin/datasource.py

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,42 @@
33
import json
44
import logging
55
import os
6-
from typing import Any, Dict, List, Optional, Tuple, Union
6+
from typing import Any, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
77

88
import requests
99

1010
from . import config
1111

12+
if TYPE_CHECKING:
13+
from _typeshed import SupportsWrite
14+
1215
LOG = logging.getLogger(__name__)
1316

1417
Record = Union[Tuple, List[Any]]
1518
Records = List[Record]
1619

1720

1821
def to_csv(records: Records, **kwargs) -> str:
22+
"""
23+
Convert the given records to CSV using a CSV writer, and return them as a single string.
24+
25+
:param records: The records to convert to CSV.
26+
:param kwargs: Args to be passed to ``csv.writer``.
27+
:return: A string representing the CSV
28+
"""
1929
output = io.StringIO()
2030
write_csv(output, records, **kwargs)
2131
return output.getvalue()
2232

2333

24-
def write_csv(file, records: Records, **kwargs):
34+
def write_csv(file: "SupportsWrite[str]", records: Records, **kwargs):
35+
"""
36+
Converts the given records to CSV and writes them to the given file.
37+
38+
:param file: The file passed to the CSV writer.
39+
:param records: The records to convert to CSV.
40+
:param kwargs: Args to be passed to ``csv.writer``.
41+
"""
2542
# TODO: do proper type conversion here to optimize for CSV input
2643
# see: https://guides.tinybird.co/guide/fine-tuning-csvs-for-fast-ingestion
2744

@@ -45,24 +62,39 @@ class Datasource:
4562
name: str
4663
version: Optional[int]
4764

48-
def __init__(self, name, token, version: int = None, api=None) -> None:
65+
def __init__(self, name, token, version: int = None, api: str = None) -> None:
4966
self.name = name
5067
self.token = token
5168
self.version = version
5269
self.api = (api or config.API_URL).rstrip("/") + self.endpoint
5370

5471
@property
55-
def canonical_name(self):
72+
def canonical_name(self) -> str:
73+
"""
74+
Returns the name of the table that can be queried. If a version is specified, the name will be suffixed with
75+
``__v<version>``. Otherwise, this just returns the name. Note that versions are discouraged in the current
76+
tinybird workflows.
77+
78+
:return: The canonical name of the table that can be used in queries
79+
"""
5680
if self.version is not None:
5781
return f"{self.name}__v{self.version}"
5882
else:
5983
return self.name
6084

61-
def append(self, records: List[Record], *args, **kwargs) -> requests.Response:
85+
def append(self, records: Records, *args, **kwargs) -> requests.Response:
86+
"""Calls ``append_csv``."""
6287
# TODO: replicate tinybird API concepts instead of returning Response
6388
return self.append_csv(records, *args, **kwargs)
6489

65-
def append_csv(self, records: List[Record], delimiter: str = ",") -> requests.Response:
90+
def append_csv(self, records: Records, delimiter: str = ",") -> requests.Response:
91+
"""
92+
Makes a POST request to the datasource using mode=append with CSV data. This appends data to the table.
93+
94+
:param records: List of records to append. The will be converted to CSV using the provided delimiter.
95+
:param delimiter: Optional delimiter (defaults to ",")
96+
:return: The HTTP response
97+
"""
6698
params = {"name": self.canonical_name, "mode": "append"}
6799
if delimiter:
68100
params["dialect_delimiter"] = delimiter
@@ -84,6 +116,12 @@ def append_csv(self, records: List[Record], delimiter: str = ",") -> requests.Re
84116
return requests.post(url=self.api, params=params, headers=headers, data=data)
85117

86118
def append_ndjson(self, records: List[Dict]) -> requests.Response:
119+
"""
120+
Makes a POST request to the datasource using mode=append with ndjson data. This appends data to the table.
121+
122+
:param records: List of JSON records to append. The will be converted to NDJSON using ``json.dumps``
123+
:return: The HTTP response
124+
"""
87125
# TODO: generalize appending in different formats
88126
query = {"name": self.canonical_name, "mode": "append", "format": "ndjson"}
89127

@@ -130,14 +168,16 @@ def __repr__(self):
130168

131169

132170
class FileDatasource(Datasource):
133-
# for debugging/development purposes
171+
"""
172+
Datasource that writes into a file, used for testing and development purposes.
173+
"""
134174

135175
def __init__(self, path: str):
136176
name = os.path.basename(path)
137177
super().__init__(name, None)
138178
self.path = path
139179

140-
def append(self, records: Records) -> requests.Response:
180+
def append_csv(self, records: Records, *args, **kwargs) -> requests.Response:
141181
if records:
142182
with open(self.path, "a") as fd:
143183
write_csv(fd, records)
@@ -152,3 +192,6 @@ def append_ndjson(self, records: List[Dict]) -> requests.Response:
152192
def readlines(self) -> List[str]:
153193
with open(self.path, "r") as fd:
154194
return fd.readlines()
195+
196+
def truncate(self):
197+
raise NotImplementedError

verdin/pipe.py

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,74 @@
11
import logging
2-
from typing import Any, Dict, Iterator, List, Optional, Tuple
2+
from typing import Any, Iterator, Optional
33

44
import requests
55

66
from . import config
77

88
LOG = logging.getLogger(__name__)
99

10-
PipeMetadata = List[Tuple[str, str]]
11-
PipeJsonData = List[Dict[str, Any]]
10+
PipeMetadata = list[tuple[str, str]]
11+
PipeJsonData = list[dict[str, Any]]
1212

1313

1414
class PipeError(Exception):
15+
"""
16+
Wrapper of the HTTP response returned by a Pipe query if the HTTP response is not a 2XX code.
17+
"""
18+
1519
response: requests.Response
1620

17-
def __init__(self, response) -> None:
21+
def __init__(self, response: requests.Response) -> None:
1822
self.response = response
19-
self.json: Dict = response.json()
23+
self.json: dict = response.json()
2024
super().__init__(self.description)
2125

2226
@property
23-
def description(self):
27+
def description(self) -> str:
2428
return self.json.get("error")
2529

2630

2731
class PipeJsonResponse:
32+
"""
33+
Wrapper of the HTTP response returned by a Pipe query.
34+
"""
35+
2836
response: requests.Response
29-
result: Dict
37+
result: dict
3038

31-
def __init__(self, response):
39+
def __init__(self, response: requests.Response):
3240
self.response = response
3341
self.result = response.json()
3442

3543
@property
36-
def empty(self):
44+
def empty(self) -> bool:
45+
"""
46+
A property to check if the data in the result is empty.
47+
48+
This property evaluates whether the "data" field within the "result"
49+
attribute is empty.
50+
51+
:return: Returns True if the "data" field in "result" is missing or empty,
52+
otherwise returns False.
53+
"""
3754
return not self.result.get("data")
3855

3956
@property
4057
def meta(self) -> PipeMetadata:
58+
"""
59+
Returns the PipeMetadata from the query, which includes attributes and their types.
60+
61+
:return: The PipeMetadata
62+
"""
4163
return [(t["name"], t["type"]) for t in self.result.get("meta", [])]
4264

4365
@property
4466
def data(self) -> PipeJsonData:
67+
"""
68+
Returns the data from the query, which is a list of dictionaries representing the rows of the query result.
69+
70+
:return: The PipeJsonData
71+
"""
4572
return self.result.get("data")
4673

4774

@@ -83,26 +110,47 @@ class Pipe:
83110
version: Optional[int]
84111
resource: str
85112

86-
def __init__(self, name, token, version: int = None, api=None) -> None:
113+
def __init__(self, name, token, version: int = None, api: str = None) -> None:
87114
super().__init__()
88115
self.name = name
89116
self.token = token
90117
self.version = version
91118
self.resource = (api or config.API_URL).rstrip("/") + self.endpoint
92119

93120
@property
94-
def canonical_name(self):
121+
def canonical_name(self) -> str:
122+
"""
123+
Returns the name of the pipe that can be queried. If a version is specified, the name will be suffixed with
124+
``__v<version>``. Otherwise, this just returns the name. Note that versions are discouraged in the current
125+
tinybird workflows.
126+
127+
:return: The canonical name of the pipe that can be used in queries
128+
"""
95129
if self.version is not None:
96130
return f"{self.name}__v{self.version}"
97131
else:
98132
return self.name
99133

100134
@property
101-
def pipe_url(self):
135+
def pipe_url(self) -> str:
136+
"""
137+
Returns the API URL of this pipe. It's something like ``https://api.tinybird.co/v0/pipes/my_pipe.json``.
138+
139+
:return: The Pipe API URL
140+
"""
102141
return self.resource + "/" + self.canonical_name + ".json"
103142

104-
def query(self, params=None) -> PipeJsonResponse:
105-
params = params or dict()
143+
def query(self, params: dict[str, str] = None) -> PipeJsonResponse:
144+
"""
145+
Query the pipe endpoint using the given dynamic parameters. Note that the pipe needs to be exposed as an
146+
endpoint.
147+
148+
See: https://www.tinybird.co/docs/forward/work-with-data/query-parameters#use-pipes-api-endpoints-with-dynamic-parameters
149+
150+
:param params: The dynamic parameters of the pipe and the values for your query
151+
:return: a PipeJsonResponse containing the result of the query
152+
"""
153+
params = params or {}
106154
if "token" not in params and self.token:
107155
params["token"] = self.token
108156

@@ -114,6 +162,15 @@ def query(self, params=None) -> PipeJsonResponse:
114162
raise PipeError(response)
115163

116164
def pages(self, page_size: int = 50, start_at: int = 0) -> PipePageIterator:
165+
"""
166+
Returns an iterator over the pipe's data pages. Each page contains ``page_size`` records.
167+
168+
TODO: currently we don't support dynamic parameters with paged queries
169+
170+
:param page_size: The size of each page (default 50)
171+
:param start_at: The offset at which to start (default 0)
172+
:return:
173+
"""
117174
return PagedPipeQuery(pipe=self, page_size=page_size, start_at=start_at)
118175

119176
def sql(self, query: str) -> PipeJsonResponse:
@@ -123,6 +180,9 @@ def sql(self, query: str) -> PipeJsonResponse:
123180
pipe.sql("select count() from _")
124181
125182
See https://docs.tinybird.co/api-reference/query-api.html
183+
184+
:param query: The SQL query to run
185+
:return: The result of the query
126186
"""
127187
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
128188
params = {"q": query}

0 commit comments

Comments
 (0)