Skip to content

Commit b37826d

Browse files
zxqfd555Manul from Pathway
authored andcommitted
full sqlite support (#10191)
GitOrigin-RevId: 14bf058e9448ebde1e676bec676db8b262992977
1 parent 225233e commit b37826d

9 files changed

Lines changed: 3446 additions & 150 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ All notable changes to this project will be documented in this file.
55
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66
## [Unreleased]
77

8+
### Added
9+
- `pw.io.sqlite.write` connector, which writes a Pathway table into a SQLite database file. Supports two modes: `stream_of_changes` (default) appends each event alongside `time`/`diff` metadata columns, while `snapshot` maintains the current state of the table via `INSERT ... ON CONFLICT DO UPDATE` on insertions and `DELETE` on retractions, keyed on the `primary_key` parameter. Values are encoded using the same storage-class mapping that `pw.io.sqlite.read` accepts, so `write` / `read` round-trips every supported Pathway type losslessly. `init_mode` controls whether the destination table is left as-is, auto-created, or replaced on start-up.
10+
811
### Changed
12+
- `pw.io.sqlite.read` now parses every Pathway `Value` variant. In addition to `int`, `float`, `str`, `bytes`, `pw.Json`, and their `Optional` forms, the reader now accepts `bool`, `pw.DateTimeNaive`, `pw.DateTimeUtc`, `pw.Duration`, `pw.Pointer`, `pw.PyObjectWrapper`, homogeneous `tuple` / `list`, and `np.ndarray`. Composite types are stored as `TEXT` using the same JSON encoding that `pw.io.jsonlines.write` emits. Booleans additionally accept PostgreSQL-style textual literals (`true`/`false`, `yes`/`no`, `on`/`off`, `t`/`f`, `y`/`n`; case-insensitive, whitespace-trimmed), and `float` columns tolerate values stored with `INTEGER` storage class.
913
- **BREAKING**: `pw.io.iceberg.write` to a Glue catalog no longer accepts `DateTimeUtc` columns. Glue's metastore has no timezone-aware timestamp type, so previous versions silently dropped the timezone on read-back; writes now fail with an explicit error instead of corrupting the zone. To store UTC timestamps in Glue, convert to `DateTimeNaive` with UTC-normalized values, or write through the REST catalog, which preserves the timezone.
1014

1115
## [0.30.1] - 2026-04-23

docs/2.developers/4.user-guide/20.connect/30.connectors-in-pathway.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ Before going into more details about the different connectors and how they work,
9292
<span class="block"><a href="/developers/api-docs/pathway-io/rabbitmq">RabbitMQ</a></span>
9393
<span class="block"><a href="/developers/user-guide/connect/connectors/switching-to-redpanda">Redpanda</a></span>
9494
<span class="block"><a href="/developers/user-guide/connect/connectors/slack_send_alerts">Slack</a></span>
95+
<span class="block"><a href="/developers/api-docs/pathway-io/sqlite">SQLite</a></span>
9596
</td>
9697
<td class="text-center !align-middle">
9798
<span class="block"><a href="/developers/user-guide/connect/connectors/csv_connectors">CSV</a></span>

python/pathway/io/sqlite/__init__.py

Lines changed: 349 additions & 9 deletions
Large diffs are not rendered by default.

python/pathway/tests/test_io.py

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import random
1313
import re
1414
import socket
15-
import sqlite3
1615
import sys
1716
import threading
1817
import time
@@ -2688,81 +2687,6 @@ class InputSchema(pw.Schema):
26882687
assert_table_equality_wo_index(result, expected)
26892688

26902689

2691-
@needs_multiprocessing_fork
2692-
def test_sqlite(tmp_path: pathlib.Path):
2693-
database_name = tmp_path / "test.db"
2694-
output_path = tmp_path / "output.csv"
2695-
2696-
connection = sqlite3.connect(database_name)
2697-
cursor = connection.cursor()
2698-
cursor.execute(
2699-
"""
2700-
CREATE TABLE users (
2701-
id INTEGER,
2702-
login TEXT,
2703-
name TEXT
2704-
)
2705-
"""
2706-
)
2707-
cursor.execute("INSERT INTO users (id, login, name) VALUES (1, 'alice', 'Alice')")
2708-
cursor.execute("INSERT INTO users (id, login, name) VALUES (2, 'bob1999', 'Bob')")
2709-
connection.commit()
2710-
2711-
def stream_target():
2712-
wait_result_with_checker(FileLinesNumberChecker(output_path, 2), 5, target=None)
2713-
connection = sqlite3.connect(database_name)
2714-
cursor = connection.cursor()
2715-
cursor.execute(
2716-
"""
2717-
INSERT INTO users (id, login, name) VALUES (3, 'ch123', 'Charlie')"""
2718-
)
2719-
connection.commit()
2720-
2721-
wait_result_with_checker(FileLinesNumberChecker(output_path, 3), 2, target=None)
2722-
cursor = connection.cursor()
2723-
cursor.execute("UPDATE users SET name = 'Bob Smith' WHERE id = 2")
2724-
connection.commit()
2725-
2726-
wait_result_with_checker(FileLinesNumberChecker(output_path, 5), 2, target=None)
2727-
cursor = connection.cursor()
2728-
cursor.execute("DELETE FROM users WHERE id = 3")
2729-
connection.commit()
2730-
2731-
class InputSchema(pw.Schema):
2732-
id: int
2733-
login: str
2734-
name: str
2735-
2736-
table = pw.io.sqlite.read(
2737-
database_name, "users", InputSchema, autocommit_duration_ms=1
2738-
)
2739-
pw.io.jsonlines.write(table, output_path)
2740-
2741-
inputs_thread = threading.Thread(target=stream_target, daemon=True)
2742-
inputs_thread.start()
2743-
2744-
wait_result_with_checker(FileLinesNumberChecker(output_path, 6), 30)
2745-
2746-
events = []
2747-
with open(output_path) as f:
2748-
for row in f:
2749-
events.append(json.loads(row))
2750-
2751-
events.sort(key=lambda event: (event["time"], event["diff"], event["name"]))
2752-
events_truncated = []
2753-
for event in events:
2754-
events_truncated.append([event["name"], event["diff"]])
2755-
2756-
assert events_truncated == [
2757-
["Alice", 1],
2758-
["Bob", 1],
2759-
["Charlie", 1],
2760-
["Bob", -1],
2761-
["Bob Smith", 1],
2762-
["Charlie", -1],
2763-
]
2764-
2765-
27662690
def test_apply_bytes_full_cycle(tmp_path: pathlib.Path):
27672691
input_path = tmp_path / "input.txt"
27682692
input_full_contents = "abc\n\ndef\nghi"

0 commit comments

Comments
 (0)