|
| 1 | +## Write Data to a Python External Stream |
| 2 | + |
| 3 | +The write function is invoked once per chunk, not once per row. Its arguments are **column-oriented**: one Python list per output column, in declared order, all of equal length. Iterate with `zip` to recover row tuples. |
| 4 | + |
| 5 | +### Sink basics |
| 6 | + |
| 7 | +```sql |
| 8 | +CREATE EXTERNAL STREAM py_metric_sink (host string, value float32) |
| 9 | +AS $$ |
| 10 | +def py_metric_sink(host, value): |
| 11 | + for h, v in zip(host, value): |
| 12 | + print(f"{h}={v}") |
| 13 | +$$ |
| 14 | +SETTINGS type = 'python'; |
| 15 | +``` |
| 16 | + |
| 17 | +Insert a few rows: |
| 18 | + |
| 19 | +```sql |
| 20 | +INSERT INTO py_metric_sink (host, value) VALUES ('a', 1.0), ('b', 2.0); |
| 21 | +``` |
| 22 | + |
| 23 | +Behind the scenes Timeplus calls `py_metric_sink(['a', 'b'], [1.0, 2.0])` — one call carrying both rows. A larger INSERT or a downstream query that delivers many chunks results in one call per chunk. |
| 24 | + |
| 25 | +If `write_function_name` is omitted Timeplus uses `read_function_name` (which itself defaults to the stream name), so the Python function above only needs to be named once. |
| 26 | + |
| 27 | +### Materialized view → external stream |
| 28 | + |
| 29 | +Routing a continuous query into a sink is the most common production pattern. Define the sink once, then point a materialized view at it: |
| 30 | + |
| 31 | +```sql |
| 32 | +CREATE EXTERNAL STREAM py_alert_sink (host string, value float32) |
| 33 | +AS $$ |
| 34 | +def py_alert_sink(host, value): |
| 35 | + for h, v in zip(host, value): |
| 36 | + notify(h, v) # your notifier |
| 37 | +$$ |
| 38 | +SETTINGS type = 'python'; |
| 39 | + |
| 40 | +CREATE MATERIALIZED VIEW high_value_alerts INTO py_alert_sink AS |
| 41 | + SELECT host, value FROM metrics WHERE value > 100; |
| 42 | +``` |
| 43 | + |
| 44 | +The materialized view feeds chunks into the sink as they are produced; each chunk becomes one call to `py_alert_sink`. |
| 45 | + |
| 46 | +### Custom protocol example: webhook POST |
| 47 | + |
| 48 | +Load the destination URL in init, reuse that configuration for every chunk, and clear it in deinit. Init parameters carry the URL so the Python body is reusable across environments. To pool an actual HTTP connection, swap `urllib` for a session-aware client (for example `requests.Session()`) and stash the session itself on `builtins`. |
| 49 | + |
| 50 | +```sql |
| 51 | +CREATE EXTERNAL STREAM py_webhook (event_id string, body string) |
| 52 | +AS $$ |
| 53 | +import builtins, json, urllib.request |
| 54 | + |
| 55 | +def open_client(config): |
| 56 | + builtins._tp_webhook = json.loads(config)["url"] |
| 57 | + |
| 58 | +def close_client(): |
| 59 | + if hasattr(builtins, "_tp_webhook"): |
| 60 | + del builtins._tp_webhook |
| 61 | + |
| 62 | +def post_event(event_id, body): |
| 63 | + for eid, b in zip(event_id, body): |
| 64 | + req = urllib.request.Request( |
| 65 | + builtins._tp_webhook, |
| 66 | + data=json.dumps({"id": eid, "body": b}).encode(), |
| 67 | + headers={"Content-Type": "application/json"}, |
| 68 | + method="POST", |
| 69 | + ) |
| 70 | + urllib.request.urlopen(req).read() |
| 71 | +$$ |
| 72 | +SETTINGS |
| 73 | + type = 'python', |
| 74 | + init_function_name = 'open_client', |
| 75 | + init_function_parameters = '{"url":"https://hooks.example.com/notify"}', |
| 76 | + deinit_function_name = 'close_client', |
| 77 | + write_function_name = 'post_event'; |
| 78 | +``` |
| 79 | + |
| 80 | +Replace `urllib` with any HTTP, S3, queue, or proprietary client your environment ships with. Manage Python dependencies through the [Python UDF](/py-udf) library configuration — the same runtime backs both features. |
| 81 | + |
| 82 | +### Failure behavior |
| 83 | + |
| 84 | +If the write function raises, the INSERT fails and the Python traceback is included in the error response. Side effects already performed by your Python code (HTTP requests sent, files written, queue messages published) are **not** rolled back by Timeplus — design idempotent writes, or batch your side effect inside a single transactional call your downstream system controls. |
0 commit comments