Skip to content

Commit c08df6e

Browse files
author
Michael
committed
Initial dict/df return routing
1 parent 0b154de commit c08df6e

1 file changed

Lines changed: 9 additions & 11 deletions

File tree

substreams/substream.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,7 @@ def poll(
170170
stream_callback: Optional[callable] = None,
171171
return_first_result: bool = False,
172172
initial_snapshot: bool = False,
173-
highest_processed_block: int = 0,
174-
return_progress: bool = False,
173+
return_type: str = "df"
175174
):
176175
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
177176
for module in output_modules:
@@ -202,7 +201,6 @@ def poll(
202201
for response in stream:
203202
snapshot = MessageToDict(response.snapshot_data)
204203
data = MessageToDict(response.data)
205-
progress = MessageToDict(response.progress)
206204
session = MessageToDict(response.session)
207205

208206
if session:
@@ -226,15 +224,13 @@ def poll(
226224
stream_callback(module_name, parsed)
227225
else:
228226
continue
229-
elif progress and return_progress is True:
230-
if 'processedBytes' in progress["modules"][0] or 'processedRanges' not in progress["modules"][0]:
231-
continue
232-
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
233-
data_block = endBlock
234-
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
235-
return {"block": int(endBlock)}
227+
228+
# Want to refactir the below logic to create a singlepoint of return and provide more consistency
236229
if return_first_result is True:
237-
return {"data": parsed, "module_name": module_name, "data_block": data_block}
230+
if return_type == "dict":
231+
return {"data": parsed, "module_name": module_name, "data_block": data_block}
232+
elif return_type == "df":
233+
return pd.DataFrame(parsed)
238234
for output_module in output_modules:
239235
result = SubstreamOutput(module_name=output_module)
240236
data_dict: dict = raw_results.get(output_module)
@@ -243,6 +239,8 @@ def poll(
243239
df["output_module"] = output_module
244240
setattr(result, k, df)
245241
results.append(result)
242+
if return_type == "dict":
243+
results = results.to_dict()
246244
except Exception as err:
247245
results = {"error": err}
248246
return results

0 commit comments

Comments
 (0)