Skip to content

Commit 8a3a77b

Browse files
author
Michael
committed
Change return logic
1 parent c08df6e commit 8a3a77b

1 file changed

Lines changed: 51 additions & 41 deletions

File tree

substreams/substream.py

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
DEFAULT_ENDPOINT = "api.streamingfast.io:443"
1818

19-
2019
def retrieve_class(module_name: str, class_name: str):
2120
module = import_module(module_name)
2221
return getattr(module, class_name)
@@ -172,32 +171,40 @@ def poll(
172171
initial_snapshot: bool = False,
173172
return_type: str = "df"
174173
):
175-
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
176-
for module in output_modules:
177-
if module not in self.output_modules:
178-
raise Exception(f"module '{module}' is not supported for {self.name}")
179-
if self.output_modules[module].get('is_map') is False:
180-
raise Exception(f"module '{module}' is not a map module")
181-
self._class_from_module(module)
182-
183-
stream = self.service.Blocks(
184-
Request(
185-
start_block_num=start_block,
186-
stop_block_num=end_block,
187-
fork_steps=[STEP_IRREVERSIBLE],
188-
modules=self.pkg.modules,
189-
output_modules=output_modules,
190-
initial_store_snapshot_for_modules=output_modules
191-
if initial_snapshot
192-
else None,
193-
)
194-
)
195-
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
196-
results = []
197-
data_block = 0
174+
198175
module_name = ""
176+
has_error = False
177+
return_dict_interface = {"data": [], "module_name": module_name, "data_block": start_block, "error": None}
178+
valid_return_types = ["dict", "df"]
179+
results = []
180+
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
199181

200182
try:
183+
return_type = return_type.lower()
184+
if return_type not in valid_return_types:
185+
return_type = "df"
186+
187+
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
188+
for module in output_modules:
189+
if module not in self.output_modules:
190+
raise Exception(f"module '{module}' is not supported for {self.name}")
191+
if self.output_modules[module].get('is_map') is False:
192+
raise Exception(f"module '{module}' is not a map module")
193+
self._class_from_module(module)
194+
195+
stream = self.service.Blocks(
196+
Request(
197+
start_block_num=start_block,
198+
stop_block_num=end_block,
199+
fork_steps=[STEP_IRREVERSIBLE],
200+
modules=self.pkg.modules,
201+
output_modules=output_modules,
202+
initial_store_snapshot_for_modules=output_modules
203+
if initial_snapshot
204+
else None,
205+
)
206+
)
207+
201208
for response in stream:
202209
snapshot = MessageToDict(response.snapshot_data)
203210
data = MessageToDict(response.data)
@@ -215,32 +222,35 @@ def poll(
215222
parsed = self._parse_data_outputs(data, output_modules)
216223
module_name = data["outputs"][0]["name"]
217224
raw_results[module_name]["data"].extend(parsed)
218-
data_block = data["clock"]["number"]
225+
return_dict_interface["data_block"] = data["clock"]["number"]
219226
if len(parsed) > 0:
220-
parsed = [dict(item, **{'block':data_block}) for item in parsed]
227+
parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed]
221228
if return_first_result is True:
222229
break
223230
if callable(stream_callback):
224231
stream_callback(module_name, parsed)
225-
else:
226-
continue
227-
228-
# Want to refactir the below logic to create a singlepoint of return and provide more consistency
229-
if return_first_result is True:
232+
233+
if return_first_result is True and parsed:
234+
return_dict_interface["data"] = parsed
230235
if return_type == "dict":
231-
return {"data": parsed, "module_name": module_name, "data_block": data_block}
232-
elif return_type == "df":
233-
return pd.DataFrame(parsed)
234-
for output_module in output_modules:
235-
result = SubstreamOutput(module_name=output_module)
236-
data_dict: dict = raw_results.get(output_module)
236+
results = return_dict_interface
237+
if return_type == "df":
238+
results = pd.DataFrame(parsed)
239+
elif raw_results:
240+
result = SubstreamOutput(module_name=output_modules[0])
241+
data_dict: dict = raw_results.get(output_modules[0])
237242
for k, v in data_dict.items():
238243
df = pd.DataFrame(v)
239-
df["output_module"] = output_module
244+
df["output_module"] = output_modules[0]
240245
setattr(result, k, df)
241246
results.append(result)
242-
if return_type == "dict":
243-
results = results.to_dict()
247+
if return_type == "dict":
248+
return_dict_interface["data"] = results.to_dict()
249+
results = return_dict_interface
250+
else:
251+
raise Exception("No Valid Data Results Returned by Substream")
244252
except Exception as err:
245-
results = {"error": err}
253+
has_error = True
254+
return_dict_interface["error"] = err
255+
results = return_dict_interface
246256
return results

0 commit comments

Comments
 (0)