Skip to content

Commit 22489fd

Browse files
committed
include more tests now that catalog.db is filled
1 parent 972a4d0 commit 22489fd

19 files changed

Lines changed: 1179 additions & 59 deletions

pysus/api/dadosgov/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import pathlib
44
from collections.abc import Callable
55
from datetime import datetime
6-
from typing import Annotated, Any, Optional
6+
from typing import TYPE_CHECKING, Annotated, Any, Optional
77

88
import httpx
99
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, PrivateAttr
1010
from pysus import __version__
1111
from pysus.api.models import BaseRemoteClient, BaseRemoteFile
1212

13-
from .models import Dataset
13+
if TYPE_CHECKING:
14+
from .models import Dataset
1415

1516

1617
def to_datetime(value: Any) -> datetime | None:

pysus/api/dadosgov/models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async def _fetch_files(self) -> list[BaseRemoteFile]:
180180

181181
class Dataset(BaseRemoteDataset):
182182
ids: list[str] = []
183-
client: DadosGov
183+
client: "DadosGov"
184184

185185
def __repr__(self):
186186
return self.name
@@ -191,7 +191,7 @@ def formatter(self, filename: str) -> dict[str, Any]:
191191

192192
async def _fetch_content(self) -> list[Group]:
193193
items: list[Group] = []
194-
client: DadosGov = self.client
194+
client: "DadosGov" = self.client
195195
if self.ids:
196196
for group_id in self.ids:
197197
record = await client.get_dataset(group_id)

pysus/api/ducklake/catalog.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@ class Base(DeclarativeBase):
2424
file_columns = Table(
2525
"file_columns",
2626
Base.metadata,
27-
Column("file_id", Integer, ForeignKey("pysus.files.id"), primary_key=True),
27+
Column(
28+
"file_id",
29+
Integer,
30+
ForeignKey("pysus.files.id"),
31+
primary_key=True,
32+
),
2833
Column(
2934
"column_id",
3035
Integer,
@@ -196,7 +201,10 @@ class CatalogFile(CatalogTable):
196201
back_populates="files",
197202
)
198203
columns: Mapped[list["ColumnDefinition"]] = relationship(
199-
"ColumnDefinition", secondary=file_columns, back_populates="files"
204+
"ColumnDefinition",
205+
secondary=file_columns,
206+
back_populates="files",
207+
cascade="all, delete",
200208
)
201209

202210
__table_args__ = (

pysus/api/extensions.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141

4242
class File(BaseLocalFile):
43-
type: FileType = Field(None)
43+
type: FileType = Field("FILE")
4444

4545
async def load(self) -> bytes:
4646
return await to_thread.run_sync(self.path.read_bytes)
@@ -181,24 +181,28 @@ def rows(self) -> int:
181181

182182
async def load(self, parse: bool = True) -> pd.DataFrame:
183183
def _load():
184-
df = pd.read_parquet(self.path)
184+
df = pd.read_parquet(self.path, engine="pyarrow")
185185
return self.parse_dftypes(df) if parse else df
186186

187187
return await to_thread.run_sync(_load)
188188

189189
async def stream(
190-
self, chunk_size: int = 10000
190+
self, chunk_size: int = 10000, parse: bool = False
191191
) -> AsyncGenerator[pd.DataFrame, None]:
192192
parquet_file = await to_thread.run_sync(pq.ParquetFile, self.path)
193193

194194
for batch in parquet_file.iter_batches(batch_size=chunk_size):
195195
df = batch.to_pandas()
196+
if parse:
197+
df = self.parse_dftypes(df)
196198
yield df
197199
await asyncio.sleep(0)
198200

199201
@staticmethod
200202
def parse_dftypes(df: pd.DataFrame) -> pd.DataFrame:
201203
def str_to_int(string):
204+
if pd.isna(string):
205+
return string
202206
clean = str(string).replace(" ", "")
203207
return int(clean) if clean.isnumeric() else string
204208

@@ -211,15 +215,16 @@ def str_to_date(string):
211215
return string
212216

213217
cols_to_date = ["DT_NOTIFIC", "DT_SIN_PRI", "DT_NASC", "DT_INTER"]
214-
cols_to_int = ["CODMUNRES", "SEXO", "IDADE"]
218+
cols_to_int = ["CODMUNRES", "IDADE"]
215219

216220
for col in df.columns:
217221
if col in cols_to_date:
218222
df[col] = df[col].map(str_to_date)
219223
elif col in cols_to_int:
220224
df[col] = df[col].map(str_to_int)
221225

222-
df = df.map(lambda x: "" if str(x).isspace() else x)
226+
df = df.replace(r"^\s+$", "", regex=True)
227+
223228
return df.convert_dtypes()
224229

225230

@@ -733,5 +738,11 @@ async def instantiate(cls, path: str | Path) -> BaseLocalFile:
733738
path = Path(path).expanduser().resolve()
734739
if await to_thread.run_sync(path.is_dir):
735740
return Directory(path=path, type="DIR")
741+
736742
FileClass = await cls.get_file_class(path)
737-
return FileClass(path=path, type=FileClass.type)
743+
file_type = getattr(FileClass, "type", "FILE")
744+
745+
if not isinstance(file_type, str):
746+
file_type = "FILE"
747+
748+
return FileClass(path=path, type=file_type)

pysus/api/ftp/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ def description(self) -> str:
5757
"""
5858

5959
@property
60-
def ftp(self) -> FTPLib:
61-
if not self._ftp:
62-
raise ConnectionError("FTP Not properly connected")
60+
def ftp(self) -> FTPLib | None:
6361
return self._ftp
6462

6563
async def connect(self) -> None:

pysus/api/ftp/models.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,28 @@
1717
)
1818
from pysus.api.types import State
1919

20-
from .client import FTP, FTPFileInfo, FTPGroupInfo
20+
from .client import FTP, FTPFileInfo
2121

2222

2323
class File(BaseRemoteFile):
2424
_info: FTPFileInfo = PrivateAttr()
2525

2626
def __init__(self, **data):
2727
info = data.pop("_info", None)
28-
path = data.pop("path", None)
28+
if "path" not in data and info and "path" in info:
29+
data["path"] = info["path"]
2930

3031
super().__init__(**data)
3132

32-
if info is not None:
33-
self._info = info
34-
35-
if path is not None:
36-
self._path = path
33+
self._info = info
34+
group_data = self._info.get("group")
35+
if group_data:
36+
self.group = Group(
37+
path=str(self.path.parent),
38+
dataset=self.dataset,
39+
long_name=group_data.get("long_name", ""),
40+
description=group_data.get("description", ""),
41+
)
3742

3843
def __repr__(self) -> str:
3944
return self.name
@@ -53,10 +58,6 @@ def modify(self) -> datetime:
5358
raise ValueError("File requires a modify date")
5459
return m
5560

56-
@property
57-
def group_info(self) -> FTPGroupInfo | None:
58-
return self._info.get("group")
59-
6061
@property
6162
def year(self) -> int | None:
6263
return self._info.get("year")
@@ -109,7 +110,10 @@ async def content(self) -> list[Directory | File]:
109110
async def load(self) -> None:
110111
if not isinstance(self.client, FTP):
111112
raise ValueError("no ftp client found")
112-
raw_infos = await self.client._list_directory(self.path, self.formatter)
113+
raw_infos = await self.client._list_directory(
114+
self.path,
115+
self.formatter,
116+
)
113117
self._content = []
114118

115119
current_group = (
@@ -157,8 +161,11 @@ def __init__(
157161
dataset: Dataset,
158162
long_name: str,
159163
description: str = "",
164+
**data: Any,
160165
):
161-
super().__init__(dataset=dataset)
166+
data.update({"dataset": dataset, "path": path})
167+
super().__init__(**data)
168+
162169
self._long_name = long_name
163170
self._description = description
164171
self._dir = Directory(

pysus/api/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ async def to_parquet(
130130
chunk_size: int = 10000,
131131
callback: Callable[[int, int], None] | None = None,
132132
) -> Parquet:
133-
from pysus.api.extensions import ExtensionFactory
133+
from pysus.api.extensions import ExtensionFactory, Parquet
134134

135135
if output_path is None:
136136
output_path = self.path.with_suffix(".parquet")

pysus/api/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Literal
22

33
FileType = Literal[
4-
None,
4+
"FILE",
55
"DIR",
66
"PARQUET",
77
"CSV",

pysus/management/client.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
ColumnDefinition,
1414
DatasetGroup,
1515
Origin,
16+
file_columns,
1617
)
1718
from pysus.api.extensions import Parquet
1819
from pysus.api.ftp.models import File as FTPFile
1920
from pysus.api.models import BaseRemoteFile
21+
from sqlalchemy import delete
2022

2123

2224
class CatalogManager:
@@ -59,6 +61,7 @@ async def upload(
5961
) -> None:
6062
if not self.pysus._ducklake:
6163
raise ConnectionError("DuckLake is not connected")
64+
6265
with self.pysus._ducklake._Session() as session:
6366
dataset = self._get_or_create_dataset(session, file)
6467
group = self._get_or_create_group(session, file, dataset)
@@ -67,7 +70,7 @@ async def upload(
6770
if not self._should_upload(file, cat_file):
6871
return
6972

70-
session.commit()
73+
session.flush()
7174

7275
parquet_ext = await self.pysus.download_to_parquet(
7376
file=file, token=self.dadosgov_token, callback=callback
@@ -78,25 +81,37 @@ async def upload(
7881
f"/{file.dataset.name.lower()}/{parquet_ext.path.name}"
7982
)
8083

81-
await self._upload_to_s3(parquet_ext.path, s3_key)
82-
8384
with self.pysus._ducklake._Session() as session:
84-
current_dataset = self._get_or_create_dataset(session, file)
85-
current_group = self._get_or_create_group(
86-
session, file, current_dataset
85+
existing_conflict = (
86+
session.query(CatalogFile)
87+
.filter(
88+
CatalogFile.path == s3_key,
89+
CatalogFile.dataset_id == dataset.id,
90+
)
91+
.first()
8792
)
8893

89-
cat_file = self._get_or_create_file(
90-
session, file, current_dataset, current_group
91-
)
94+
if existing_conflict and existing_conflict.id != cat_file.id:
95+
session.execute(
96+
delete(file_columns).where(
97+
file_columns.c.file_id == existing_conflict.id
98+
)
99+
)
100+
session.flush()
101+
session.delete(existing_conflict)
102+
session.flush()
103+
104+
cat_file = session.merge(cat_file)
105+
106+
await self._upload_to_s3(parquet_ext.path, s3_key)
92107

93108
cat_file.path = s3_key
94109
cat_file.size = parquet_ext.size
95110
cat_file.rows = parquet_ext.rows
96111
cat_file.modified = datetime.utcnow()
97112
cat_file.origin_modified = file.modify
98113
cat_file.columns = self._get_or_create_columns(
99-
session, current_dataset, parquet_ext
114+
session, dataset, parquet_ext
100115
)
101116

102117
session.commit()
@@ -140,9 +155,8 @@ def _get_or_create_dataset(
140155
ds_name = file.dataset.name.lower()
141156
ds = session.query(CatalogDataset).filter_by(name=ds_name).first()
142157
if not ds:
143-
origin = (
144-
Origin.FTP if file.client.name.lower() == "ftp" else Origin.API
145-
)
158+
is_ftp = file.client.name.lower() == "ftp"
159+
origin = Origin.FTP if is_ftp else Origin.API
146160
ds = CatalogDataset(
147161
name=ds_name, long_name=file.dataset.long_name, origin=origin
148162
)
@@ -201,7 +215,7 @@ def _get_or_create_file(
201215
size=0,
202216
rows=0,
203217
modified=datetime.min,
204-
origin_path=file.path,
218+
origin_path=str(file.path),
205219
year=file.year,
206220
month=file.month,
207221
state=file.state,

0 commit comments

Comments
 (0)