|
| 1 | +--- |
| 2 | +id: python-external-stream |
| 3 | +title: Python External Stream |
| 4 | +--- |
| 5 | + |
| 6 | +# Python External Stream |
| 7 | + |
| 8 | +Python External Stream lets you read from and write to arbitrary sources by embedding a Python body directly in the DDL. It is available in **Timeplus Enterprise 3.1.1+**. |
| 9 | + |
| 10 | +Unlike the Kafka, Pulsar, and NATS JetStream external streams — which speak a specific wire protocol — a Python External Stream is a generic escape hatch: you bring the protocol, the client library, and the logic. Timeplus calls your functions inside the embedded CPython runtime and treats return values as row batches. The same DDL object can serve as both a source (via `read_function_name`) and a sink (via `write_function_name`). |
| 11 | + |
| 12 | +## Minimum version matrix |
| 13 | + |
| 14 | +| Feature | Minimum Timeplus Enterprise version | |
| 15 | +|---|---| |
| 16 | +| `type='python'` external stream (read + write) | 3.1.1 | |
| 17 | +| `init_function_name`, `deinit_function_name`, `init_function_parameters` lifecycle hooks | 3.2.1 | |
| 18 | +| `__timeplus_local_api_user` / `__timeplus_local_api_password` injected globals | 3.2.2 | |
| 19 | + |
| 20 | +## Syntax |
| 21 | + |
| 22 | +```sql |
| 23 | +CREATE EXTERNAL STREAM [IF NOT EXISTS] stream_name (<col_name1> <col_type>) |
| 24 | +AS $$ |
| 25 | +def read_fn(): |
| 26 | + ... |
| 27 | + |
| 28 | +def write_fn(col1, ...): |
| 29 | + ... |
| 30 | + |
| 31 | +def init_fn(config): # optional, 3.2.1+ |
| 32 | + ... |
| 33 | + |
| 34 | +def deinit_fn(): # optional, 3.2.1+ |
| 35 | + ... |
| 36 | +$$ |
| 37 | +SETTINGS |
| 38 | + type = 'python', -- required |
| 39 | + read_function_name = '..', -- defaults to the stream name |
| 40 | + write_function_name = '..', -- defaults to read_function_name |
| 41 | + init_function_name = '..', -- 3.2.1+ |
| 42 | + init_function_parameters = '..', -- 3.2.1+ (requires init_function_name) |
| 43 | + deinit_function_name = '..', -- 3.2.1+ |
| 44 | + mode = 'auto' -- 'auto' (default), 'streaming', or 'batch' |
| 45 | +``` |
| 46 | + |
| 47 | +## Settings |
| 48 | + |
| 49 | +* **type**: must be `'python'`. Required. |
| 50 | +* **read_function_name**: name of the Python function used when the stream is read from. Defaults to the stream name. |
| 51 | +* **write_function_name**: name of the Python function used when the stream is written to (sink). Defaults to `read_function_name`. |
| 52 | +* **init_function_name** *(Timeplus Enterprise 3.2.1+)*: name of a Python function called once before read/write processing begins. Use it to open connections, warm caches, or stash state on `builtins` for the entry function to consume. |
| 53 | +* **init_function_parameters** *(Timeplus Enterprise 3.2.1+)*: a single string passed as the only argument to the init function. Any format works (JSON, `key=value`, or a plain string) — parsing is up to your Python code. Requires `init_function_name`; otherwise the stream fails to create with `Setting 'init_function_parameters' requires 'init_function_name' to be configured`. |
| 54 | +* **deinit_function_name** *(Timeplus Enterprise 3.2.1+)*: name of a Python function called once after read/write processing completes, for cleanup. |
| 55 | +* **mode**: Python table execution mode — `'auto'` (default), `'streaming'`, or `'batch'`. |
| 56 | + |
| 57 | +:::info |
| 58 | +Attempting to use `init_function_name`, `deinit_function_name`, or `init_function_parameters` on versions earlier than 3.2.1 fails with: |
| 59 | +``` |
| 60 | +Code: 115. DB::Exception: Unknown setting init_function_name: for storage ExternalStream. |
| 61 | +``` |
| 62 | +Upgrade to 3.2.1 or later to use these hooks. |
| 63 | +::: |
| 64 | + |
| 65 | +## Local API credentials *(Timeplus Enterprise 3.2.2+)* |
| 66 | + |
| 67 | +When the local API user is enabled on the server, Timeplus injects two module-level globals into every Python External Stream module so your code can authenticate back to the same timeplusd over the native TCP protocol or the REST HTTP interface without hard-coding credentials: |
| 68 | + |
| 69 | +* `__timeplus_local_api_user` — the ephemeral local API username. |
| 70 | +* `__timeplus_local_api_password` — the matching token. Treat this as a secret; do not log it. |
| 71 | + |
| 72 | +Both globals are available as bare names inside the Python body — no `os.environ` lookup needed. They are regenerated on every server restart and never written to disk. |
| 73 | + |
| 74 | +## Example: init / deinit hooks |
| 75 | + |
| 76 | +```sql |
| 77 | +CREATE EXTERNAL STREAM py_cookie_counter |
| 78 | +( |
| 79 | + previous_cleanup_count int32, |
| 80 | + secret_flavor string |
| 81 | +) |
| 82 | +AS $$ |
| 83 | +import builtins, json |
| 84 | + |
| 85 | +def open_bakery(config): |
| 86 | + builtins._tp_cookie_secret_flavor = json.loads(config)["flavor"] |
| 87 | + |
| 88 | +def close_bakery(): |
| 89 | + if hasattr(builtins, "_tp_cookie_secret_flavor"): |
| 90 | + del builtins._tp_cookie_secret_flavor |
| 91 | + |
| 92 | +def serve_cookie_report(): |
| 93 | + return [(0, getattr(builtins, "_tp_cookie_secret_flavor", ""))] |
| 94 | +$$ |
| 95 | +SETTINGS |
| 96 | + type = 'python', |
| 97 | + read_function_name = 'serve_cookie_report', |
| 98 | + init_function_name = 'open_bakery', |
| 99 | + init_function_parameters = '{"flavor":"double-chocolate"}', |
| 100 | + deinit_function_name = 'close_bakery'; |
| 101 | +``` |
| 102 | + |
| 103 | +## Related |
| 104 | + |
| 105 | +- [Python UDF](/py-udf) — the embedded Python runtime, library management, and data-type mapping shared with Python External Stream. |
| 106 | +- [CREATE EXTERNAL STREAM](/sql-create-external-stream) — SQL reference for all external stream types. |
0 commit comments