Skip to content

Commit be99956

Browse files
authored
Add emit strategy and return shape details for JS and Python UDFs (#572)
1 parent f9bd787 commit be99956

2 files changed

Lines changed: 12 additions & 1 deletion

File tree

docs/js-udf.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ Let's take an example of a function to get the second maximum values from the gr
143143
| 5 | deserialize(str) | No | Opposite to serialize(). Read the string and convert back to JS internal state. | function(str)\{<br />let s=JSON.parse(str);<br />this.max=s['max'];<br />this.sec_max=s['sec_max'];<br />} |
144144
| 6 | merge(str) | No | Merges two states into one. Used for multiple shards processing. | function(str)\{<br />let s=JSON.parse(str);<br />if..else..} |
145145

146+
**Emit strategy and return shape**
147+
148+
- Default (no `has_customized_emit`): `finalize()` must return a single value matching the declared return type. Any value returned from `process()` is ignored.
149+
- Custom emit (`has_customized_emit: true`): `process()` should return an integer (or `true`/`false`) indicating how many results to emit now, and `finalize()` must return an array whose length equals that emit count.
150+
146151

147152

148153
### Example: get second largest number {#udaf-example}

docs/py-udf.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ The function list:
9999
* `deserialize` load the state from checkpoint to the internal state, optional.
100100
* `merge` for multi-shard processing, merge the states from each shard, optional.
101101

102+
**Emit strategy and return shape**
103+
104+
- By default, Proton calls `finalize()` once and expects a *single* value of the declared return type (not a list). Any value returned from `process()` is ignored in this mode.
105+
- If you need to emit multiple results per group, set `self.has_customized_emit = True` in `__init__`, return either an integer count or `True`/`False` from `process()` to indicate how many results to emit, and return a list from `finalize()` whose length matches that count.
106+
- Only one UDA with `has_customized_emit = True` is supported per streaming query.
107+
102108
## Examples
103109

104110
### A simple UDF without dependency
@@ -166,7 +172,7 @@ class getMax:
166172
if item > self.max:
167173
self.max = item
168174
def finalize(self):
169-
return [self.max]
175+
return self.max
170176
$$;
171177
```
172178

0 commit comments

Comments
 (0)