Skip to content
This repository was archived by the owner on Sep 4, 2025. It is now read-only.

Commit f3b40a5

Browse files
authored
Merge pull request #101 from 0xgouda/add-pyiceberg-sink
[+] add python based iceberg sink
2 parents 56af4a3 + da8bbe7 commit f3b40a5

8 files changed

Lines changed: 273 additions & 3 deletions

File tree

.github/workflows/test.yml

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,26 @@ jobs:
4747
- name: Setup Protobuf
4848
uses: ./.github/actions/setup-protobuf
4949

50-
- name: Test
50+
- name: Set up Python
51+
uses: actions/setup-python@v5
52+
with:
53+
python-version: '3.12'
54+
55+
- name: Install dependencies
56+
run: |
57+
python3 -m pip install --upgrade pip
58+
pip install pytest
59+
pip install -r cmd/pyiceberg_receiver/requirements.txt
60+
61+
- name: Golang Tests
5162
run: |
5263
go generate ./sinks/pb
53-
go test -timeout 10m -failfast -v -coverprofile=profile.cov ./...
64+
go test -timeout 20m -failfast -v -coverprofile=profile.cov ./...
65+
66+
- name: Python Tests
67+
run: |
68+
python3 -m grpc_tools.protoc -I sinks/pb --python_out=cmd/pyiceberg_receiver --grpc_python_out=cmd/pyiceberg_receiver sinks/pb/pgwatch.proto
69+
pytest
5470
5571
- name: Coveralls
5672
uses: coverallsapp/github-action@v2

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
./*/*/*.txt
22
.vscode/
3-
*pb.go
3+
*pb.go
4+
*pb2*.py
5+
__pycache__/
6+
.pytest_cache/
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
catalog:
2+
pgcatalog:
3+
type: sql
4+
uri: postgresql://username:password@localhost:5432/database
5+
init_catalog_tables: true
6+
echo: false
7+
pool_pre_ping: false

cmd/pyiceberg_receiver/README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Iceberg Receiver
2+
3+
A gRPC server that writes metrics received from pgwatch in Iceberg Table format.
4+
5+
- The server assumes a PostgreSQL catalog is used and creates `pgwatch` namespace and `pgwatch.metrics` table within it if they don't exist.
6+
- The table is partitioned by `MetricName` and `DBName` (in order).
7+
- Metrics are written in the local file system as Apache Arrow records with the following schema:
8+
```python
9+
Schema(
10+
NestedField(field_id=1, name="DBName", field_type=StringType(), required=True),
11+
NestedField(field_id=2, name="MetricName", field_type=StringType(), required=True),
12+
NestedField(field_id=3, name="Data", field_type=StringType(), required=True),
13+
)
14+
```
15+
- Catalog configurations should be provided in [.pyiceberg.yaml](./.pyiceberg.yaml) file under `pgcatalog` see [PyIceberg SQL Catalog](https://py.iceberg.apache.org/configuration/#sql-catalog) for details.
16+
17+
## Flags
18+
19+
```bash
20+
usage: pyiceberg_receiver [-h] -p PORT -d DIR
21+
22+
options:
23+
-h, --help show this help message and exit
24+
-p PORT, --port PORT The port number to use for the gRPC server.
25+
-d DIR, --iceberg-data-dir DIR
26+
Directory to store iceberg tables in.
27+
```
28+
29+
## Usage example
30+
31+
```bash
32+
# generate python gRPC code from protobuf
33+
python3 -m grpc_tools.protoc -I sinks/pb --python_out=cmd/pyiceberg_receiver --grpc_python_out=cmd/pyiceberg_receiver sinks/pb/pgwatch.proto
34+
# install dependencies
35+
pip install -r requirements.txt
36+
# tell PyIceberg about the dir to look for .pyiceberg.yaml in
37+
export PYICEBERG_HOME="cmd/pyiceberg_receiver"
38+
# run the server
39+
python3 cmd/pyiceberg_receiver -p <grpc-server-port> -d <dir-path>
40+
```
41+
42+
## TODO
43+
44+
- [ ] Use object storage instead of the local file system.
45+
- [ ] Support TLS over the gRPC connection.
46+
- [ ] Add authentication interceptor.
47+
- [ ] Cache measurements to minimize the number of Parquet files written.

cmd/pyiceberg_receiver/__main__.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import grpc
2+
import argparse
3+
from concurrent import futures
4+
from iceberg_receiver import IcebergReceiver
5+
from pgwatch_pb2_grpc import add_ReceiverServicer_to_server
6+
7+
parser = argparse.ArgumentParser()
8+
parser.add_argument(
9+
"-p", "--port",
10+
type=int,
11+
dest="port",
12+
required=True,
13+
action="store",
14+
help="The port number to use for the gRPC server."
15+
)
16+
17+
parser.add_argument(
18+
"-d", "--iceberg-data-dir",
19+
type=str,
20+
dest="icebergDataDir",
21+
metavar="DIR",
22+
required=True,
23+
action="store",
24+
help="Directory to store iceberg tables in."
25+
)
26+
args = parser.parse_args()
27+
28+
def serve(port: int):
29+
"""Starts gRPC server listening on port"""
30+
31+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
32+
add_ReceiverServicer_to_server(
33+
IcebergReceiver(args.icebergDataDir),
34+
server,
35+
)
36+
server.add_insecure_port(f"0.0.0.0:{port}")
37+
server.start()
38+
print(f"gRPC server started, listening on port {port}")
39+
server.wait_for_termination()
40+
41+
serve(args.port)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import pyarrow as pa
2+
import json
3+
from pgwatch_pb2_grpc import ReceiverServicer
4+
from pgwatch_pb2 import Reply
5+
from google.protobuf import json_format
6+
from pyiceberg.catalog import load_catalog
7+
from pyiceberg.schema import Schema
8+
from pyiceberg.partitioning import PartitionSpec, PartitionField
9+
from pyiceberg.transforms import IdentityTransform
10+
from pyiceberg.types import (
11+
NestedField,
12+
StringType
13+
)
14+
15+
class IcebergReceiver(ReceiverServicer):
16+
def __init__(self, icebergDataDir: str):
17+
"""
18+
Creates pgwatch.metrics table in pgcatalog if it doesn't exist.
19+
20+
The table is partitioned by DBName and MetricName fields.
21+
22+
Args:
23+
icebergDataDir (str): Local file system dir path to store data at.
24+
"""
25+
26+
catalog = load_catalog("pgcatalog")
27+
catalog.create_namespace_if_not_exists("pgwatch")
28+
29+
schema = Schema(
30+
NestedField(field_id=1, name="DBName", field_type=StringType(), required=True),
31+
NestedField(field_id=2, name="MetricName", field_type=StringType(), required=True),
32+
NestedField(field_id=3, name="Data", field_type=StringType(), required=True),
33+
)
34+
35+
partition_spec = PartitionSpec(
36+
PartitionField(
37+
source_id=2, field_id=1000, transform=IdentityTransform(), name="MetricName"
38+
),
39+
PartitionField(
40+
source_id=1, field_id=1001, transform=IdentityTransform(), name="DBName"
41+
),
42+
)
43+
44+
tbl = catalog.create_table_if_not_exists(
45+
identifier=("pgwatch", "metrics"),
46+
schema=schema,
47+
location=icebergDataDir,
48+
partition_spec=partition_spec
49+
)
50+
51+
self.catalog = catalog
52+
self.tbl = tbl
53+
self.arrow_schema = tbl.schema().as_arrow()
54+
55+
56+
def UpdateMeasurements(self, request, context):
57+
data = [json_format.MessageToDict(row) for row in request.Data]
58+
dataJson = json.dumps(data)
59+
60+
measurement = [{
61+
"DBName": request.DBName,
62+
"MetricName": request.MetricName,
63+
"Data": dataJson,
64+
}]
65+
66+
df = pa.Table.from_pylist(measurement, schema=self.arrow_schema)
67+
self.tbl.append(df)
68+
69+
return Reply(logmsg="Metrics Inserted in iceberg.")
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import yaml
2+
import os
3+
import pytest
4+
from testcontainers.postgres import PostgresContainer
5+
from pgwatch_pb2 import MeasurementEnvelope, Reply
6+
7+
@pytest.fixture(scope="module", autouse=True)
8+
def setup_catalog(tmp_path_factory):
9+
with PostgresContainer("postgres:16") as postgres:
10+
tmp_path = tmp_path_factory.getbasetemp()
11+
os.environ["PYICEBERG_HOME"] = tmp_path.as_posix()
12+
13+
test_iceberg_yaml = {
14+
"catalog": {
15+
"pgcatalog": {
16+
"uri": postgres.get_connection_url(),
17+
"type": "sql",
18+
"init_catalog_tables": True
19+
}
20+
}
21+
}
22+
23+
test_pyiceberg_file = tmp_path / ".pyiceberg.yaml"
24+
with open(test_pyiceberg_file.as_posix(), 'w') as file:
25+
yaml.dump(test_iceberg_yaml, file, default_flow_style=False, allow_unicode=True)
26+
27+
yield
28+
29+
30+
def test_IcebergReceiver(tmp_path):
31+
# Late import to allow `PYICEERG_HOME` env to be
32+
# set by `setup_catalog` fixture before
33+
# pyiceberg reads it on init
34+
from iceberg_receiver import IcebergReceiver
35+
36+
recv = IcebergReceiver(tmp_path.as_posix())
37+
msg = MeasurementEnvelope(DBName="test",MetricName="test")
38+
reply = recv.UpdateMeasurements(msg, None)
39+
assert reply == Reply(logmsg="Metrics Inserted in iceberg.")
40+
41+
paTable = recv.tbl.scan().to_arrow()
42+
pyList = paTable.to_pylist()
43+
44+
assert pyList[0]["DBName"] == "test"
45+
assert pyList[0]["MetricName"] == "test"
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
annotated-types==0.7.0
2+
cachetools==5.5.2
3+
certifi==2025.8.3
4+
charset-normalizer==3.4.3
5+
click==8.2.1
6+
docker==7.1.0
7+
fsspec==2025.7.0
8+
greenlet==3.2.4
9+
grpcio==1.74.0
10+
grpcio-tools==1.74.0
11+
idna==3.10
12+
iniconfig==2.1.0
13+
markdown-it-py==4.0.0
14+
mdurl==0.1.2
15+
mmh3==5.2.0
16+
packaging==25.0
17+
pluggy==1.6.0
18+
protobuf==6.32.0
19+
psycopg2-binary==2.9.10
20+
pyarrow==21.0.0
21+
pydantic==2.11.7
22+
pydantic_core==2.33.2
23+
Pygments==2.19.2
24+
pyiceberg==0.9.1
25+
pyparsing==3.2.3
26+
pytest==8.4.1
27+
python-dateutil==2.9.0.post0
28+
python-dotenv==1.1.1
29+
PyYAML==6.0.2
30+
requests==2.32.5
31+
rich==13.9.4
32+
setuptools==80.9.0
33+
six==1.17.0
34+
sortedcontainers==2.4.0
35+
SQLAlchemy==2.0.43
36+
strictyaml==1.7.3
37+
tenacity==9.1.2
38+
testcontainers==4.12.0
39+
typing-inspection==0.4.1
40+
typing_extensions==4.14.1
41+
urllib3==2.5.0
42+
wrapt==1.17.3

0 commit comments

Comments
 (0)