Skip to content

Commit 717f876

Browse files
add nats jetstream external stream documents (#626)
* add nats jetstream external stream documents * minor fx --------- Co-authored-by: Ken Chen <zlchen.ken@gmail.com>
1 parent acf3e26 commit 717f876

10 files changed

Lines changed: 585 additions & 33 deletions

docs/connect-data-in.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Timeplus supports multiple ways to load data into the system, or access the exte
44

55
- [External Stream for Apache Kafka](/external-stream), Confluent, Redpanda, and other Kafka API compatible data streaming platform. This feature is also available in Timeplus Proton.
66
- [External Stream for Apache Pulsar](/pulsar-source) is available in Timeplus Enterprise 2.5 and above.
7+
- [External Stream for NATS JetStream](/nats-jetstream-source) is available for NATS messaging system with JetStream enabled.
78
- Source for extra wide range of data sources. This is only available in Timeplus Enterprise. This integrates with [Redpanda Connect](https://redpanda.com/connect), supporting 200+ connectors.
89
- On Timeplus web console, you can also [upload CSV files](#csv) and import them into streams.
910
- For Timeplus Enterprise, [REST API](/ingest-api) and SDKs are provided to push data to Timeplus programmatically.
@@ -41,6 +42,12 @@ Apache® Pulsar™ is a cloud-native, distributed, open source messaging and str
4142

4243
[Learn more.](/pulsar-source)
4344

45+
### Load streaming data from NATS JetStream {#nats}
46+
47+
[NATS](https://nats.io/) is a high-performance, lightweight messaging system. NATS JetStream provides durable, replayable message streams. With NATS JetStream External Streams, you can read or write data from/to NATS JetStream without moving data.
48+
49+
[Learn more.](/nats-jetstream-source)
50+
4451
### Upload local files {#csv}
4552

4653
If you have some static dataset or lookup tables in the CSV format, you can upload the files directly to Timeplus.

docs/external-stream.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ You can create **External Streams** in Timeplus to query data in the external sy
44

55
You can run streaming analytics with the external streams in the similar way as other streams.
66

7-
Timeplus supports 4 types of external streams:
7+
Timeplus supports 5 types of external streams:
88
* [Kafka External Stream](/kafka-source)
99
* [Pulsar External Stream](/pulsar-source)
10+
* [NATS JetStream External Stream](/nats-jetstream-source)
1011
* [Timeplus External Stream](/timeplus-source), only available in Timeplus Enterprise
1112
* [Log External Stream](/log-stream) (experimental)
1213

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
id: nats-jetstream-sink
3+
title: NATS JetStream Sink
4+
---
5+
6+
import ExternalNATSBasics from './shared/nats-jetstream-external-stream.md';
7+
import ExternalNATSWrite from './shared/nats-jetstream-external-stream-write.md';
8+
9+
<ExternalNATSBasics />
10+
<ExternalNATSWrite />
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
id: nats-jetstream-source
3+
title: NATS JetStream Source
4+
---
5+
6+
import ExternalNATSBasics from './shared/nats-jetstream-external-stream.md';
7+
import ExternalNATSRead from './shared/nats-jetstream-external-stream-read.md';
8+
9+
<ExternalNATSBasics />
10+
<ExternalNATSRead />

docs/send-data-out.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ With Timeplus Console, you can easily explore and analyze streaming data, with i
55
Timeplus supports various systems as the downstreams:
66
* [Send data to Kafka topics](#kafka)
77
* [Send data to Pulsar topics](/pulsar-sink)
8+
* [Send data to NATS JetStream](/nats-jetstream-sink)
89
* [Send data to ClickHouse tables](/clickhouse-external-table#write)
910
* [Send data to another Timeplus deployment](/timeplus-source)
1011
* [Send data to Webhook endpoints](#webhook)
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
## Read Data from NATS JetStream
2+
3+
Timeplus allows reading NATS JetStream messages in multiple data formats, including:
4+
5+
* Plain string (raw)
6+
* CSV / TSV
7+
* JSON
8+
* Protobuf
9+
* Avro
10+
11+
### Read NATS Messages as Raw String
12+
13+
Use this mode when:
14+
15+
* Messages contain **unstructured text or binary data**
16+
* No built-in format is applicable
17+
* You want to **debug raw NATS messages**
18+
19+
#### Raw String Example
20+
21+
```sql
22+
CREATE EXTERNAL STREAM ext_application_logs (raw string)
23+
SETTINGS type='nats_jetstream',
24+
url='nats://localhost:4222',
25+
stream_name='application_logs',
26+
subject='app.logs.*'
27+
```
28+
29+
You can use functions like regex string processing or JSON extract functions to further process the raw string.
30+
31+
#### Regex Example – Parse Application Logs
32+
33+
```sql
34+
SELECT
35+
to_time(extract(raw, '^(\\d{4}\\.\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d+)')) AS timestamp,
36+
extract(raw, '} <(\\w+)>') AS level,
37+
extract(raw, '} <\\w+> (.*)') AS message
38+
FROM ext_application_logs;
39+
```
40+
41+
### Read JSON NATS Messages
42+
43+
Assuming NATS messages contain JSON text with this schema:
44+
45+
```json
46+
{
47+
"actor": string,
48+
"created_at": timestamp,
49+
"id": string,
50+
"payload": string,
51+
"repo": string,
52+
"type": string
53+
}
54+
```
55+
56+
You can process JSON in two ways:
57+
58+
#### Option A: Parse with JSON Extract Functions
59+
60+
1. Create a raw stream:
61+
62+
```sql
63+
CREATE EXTERNAL STREAM ext_json_raw (raw string)
64+
SETTINGS type='nats_jetstream',
65+
url='nats://localhost:4222',
66+
stream_name='github_events',
67+
subject='github.events.>';
68+
```
69+
70+
2. Extract fields using JSON extract shortcut syntax or [JSON extract functions](/functions_for_json):
71+
72+
```sql
73+
SELECT
74+
raw:actor AS actor,
75+
raw:created_at::datetime64(3, 'UTC') AS created_at,
76+
raw:id AS id,
77+
raw:payload AS payload,
78+
raw:repo AS repo,
79+
raw:type AS type
80+
FROM ext_json_raw;
81+
```
82+
83+
This method is most flexible and works well for dynamic JSON with new or missing fields. It can also extract nested JSON fields.
84+
85+
#### Option B: Use JSONEachRow Format
86+
87+
Define a NATS JetStream external stream with columns mapped to the JSON fields and specify `data_format='JSONEachRow'`:
88+
89+
```sql
90+
CREATE EXTERNAL STREAM ext_json_parsed
91+
(
92+
actor string,
93+
created_at datetime64(3, 'UTC'),
94+
id string,
95+
payload string,
96+
repo string,
97+
type string
98+
)
99+
SETTINGS type='nats_jetstream',
100+
url='nats://localhost:4222',
101+
stream_name='github_events',
102+
subject='github.events',
103+
data_format='JSONEachRow'
104+
```
105+
106+
When you query the stream, JSON fields are parsed and cast to the target column types automatically.
107+
108+
This method is most convenient when the JSON schema is stable and works for top-level JSON fields.
109+
110+
### Read CSV NATS Messages
111+
112+
Similar to `JSONEachRow`, you can read CSV formatted messages:
113+
114+
```sql
115+
CREATE EXTERNAL STREAM ext_csv_parsed
116+
(
117+
actor string,
118+
created_at datetime64(3, 'UTC'),
119+
id string,
120+
payload string,
121+
repo string,
122+
type string
123+
)
124+
SETTINGS type='nats_jetstream',
125+
url='nats://localhost:4222',
126+
stream_name='csv_stream',
127+
subject='csv.data',
128+
data_format='CSV';
129+
```
130+
131+
### Read TSV NATS Messages
132+
133+
Identical to CSV, but expects **tab-separated values**:
134+
135+
```sql
136+
SETTINGS data_format='TSV';
137+
```
138+
139+
### Read Avro or Protobuf Messages
140+
141+
To read Avro-encoded or Protobuf-encoded NATS messages, please refer to [Schema](/timeplus-format-schema) documentation.
142+
143+
### Access NATS Message Metadata
144+
145+
Timeplus provides **virtual columns** for NATS JetStream message metadata.
146+
147+
| Virtual Column | Description | Type |
148+
|----------------|-------------|------|
149+
| `_tp_time` | NATS message timestamp | `datetime64(3, 'UTC')` |
150+
| `_tp_append_time` | Message append time | `datetime64(3, 'UTC')` |
151+
| `_tp_process_time` | Processing time | `datetime64(3, 'UTC')` |
152+
| `_tp_sn` | Stream sequence number | `int64` |
153+
| `_tp_shard` | Always 0 for NATS | `int32` |
154+
| `_tp_message_headers` | NATS headers as key-value map | `map(string, string)` |
155+
| `_nats_subject` | NATS subject | `string` |
156+
157+
### NATS Message Metadata Examples
158+
159+
```sql
160+
-- View message time and payload
161+
SELECT _tp_time, raw FROM ext_github_events;
162+
163+
-- View message subject
164+
SELECT _nats_subject, raw FROM ext_github_events;
165+
166+
-- Access headers
167+
SELECT _tp_message_headers['trace_id'], raw FROM ext_github_events;
168+
169+
-- View sequence number
170+
SELECT _tp_sn, raw FROM ext_github_events;
171+
```
172+
173+
### Query Settings for NATS JetStream External Streams
174+
175+
#### Controlling Where to Start Reading
176+
177+
Use the `seek_to` query setting to control where to start consuming messages.
178+
179+
##### Start from Earliest (All Messages)
180+
181+
```sql
182+
SELECT raw FROM ext_stream SETTINGS seek_to='earliest'
183+
```
184+
185+
For non-streaming queries (using `table()` function), `seek_to` defaults to `'earliest'`.
186+
187+
##### Start from Latest (New Messages Only)
188+
189+
```sql
190+
SELECT raw FROM ext_stream SETTINGS seek_to='latest'
191+
```
192+
193+
For streaming queries, `seek_to` defaults to `'latest'`.
194+
195+
##### Start from Specific Stream Sequence Number
196+
197+
```sql
198+
SELECT raw FROM ext_stream SETTINGS seek_to='1000'
199+
```
200+
201+
This starts reading from sequence number 1000.
202+
203+
##### Start from Specific Timestamp
204+
205+
```sql
206+
SELECT raw FROM ext_stream SETTINGS seek_to='2025-01-01T00:00:00.000'
207+
```
208+
209+
Timeplus converts the timestamp to the appropriate starting point in the stream.
210+
211+
#### record_consume_timeout_ms
212+
213+
Use `record_consume_timeout_ms` to determine how long the external stream waits for new messages before returning results. Smaller values reduce latency but may impact performance.
214+
215+
```sql
216+
SELECT raw FROM ext_stream SETTINGS record_consume_timeout_ms=100
217+
```
218+
219+
#### record_consume_batch_count
220+
221+
Use `record_consume_batch_count` to control the number of messages fetched in each batch. Default is `1000`.
222+
223+
```sql
224+
SELECT raw FROM ext_stream SETTINGS record_consume_batch_count=500
225+
```

0 commit comments

Comments
 (0)