Skip to content

Commit 490bc30

Browse files
author
Michael
committed
Error Handling + replace module_name list with str
1 parent 8a3a77b commit 490bc30

2 files changed

Lines changed: 32 additions & 43 deletions

File tree

README.md

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,33 +39,27 @@ In order to poll the substream, you will need to call the `poll()` function on t
3939
print(sb.output_modules)
4040

4141
# Poll the module and return a list of SubstreamOutput objects in the order of the specified modules
42-
result = sb.poll(["store_swap_events"], start_block=10000835, end_block=10000835+20000)
42+
result = sb.poll(["map_swap_events"], start_block=10000835, end_block=10000835+20000)
4343
```
4444

4545
With the default inputs, this function outputs Pandas Dataframes after streaming all blocks between the start_block and end_block. However depending on how this function is called, a dict object is returned. The `poll()` function has a number of inputs
4646

4747
- output_modules
48-
- List of strings of output modules to stream
48+
- String of the output module to stream
4949
- start_block
5050
- Integer block number to start the polling
5151
- end_block
5252
- Integer block number to end the polling. In theory, there is no max block number as any block number past chain head will stream the blocks in real time. Its recommended to use an end_block far off into the future if building a data app that will be streaming datain real time as blocks finalize, such as block 20,000,000
53-
- stream_callback
54-
- An optional callback function to be passed into the polling function to execute when valid streamed data is received
5553
- return_first_result
5654
- Boolean value that if True will return data on the first block after the start block to have an applicable TX/Event.
5755
- Can be called recursively on the front end while incrementing the start_block to return data as its streamed rather than all data at once after streaming is completed
5856
- Defaults to False
59-
- If True, the data is returned in the format {"data": [], "module_name": String, "data_block": int}
6057
- initial_snapshot
6158
- Boolean value, defaults to False
62-
- highest_processed_block
63-
- Integer block number that is used in measuring indexing and processing progress, in cases where return_progress is True
64-
- Defaults to 0
65-
- return_progress: bool = False,
66-
- Boolean value that if True returns progress in back processing
67-
- Defaults to False
68-
59+
- return_type
60+
- Specifies the type of value to return
61+
- Passing "df" returns the data in a pandas DataFrame
62+
- Passing "dict" returns in the format {"data": [], "module_name": String, "data_block": int, error: str | None}
6963

7064
The result here is the default `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing:
7165

substreams/substream.py

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,10 @@ def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]:
125125
for x in snapshot["deltas"].get("deltas", list())
126126
]
127127

128-
def _parse_data_outputs(self, data: dict, module_names: list[str]) -> list[dict]:
128+
def _parse_data_outputs(self, data: dict, module_name: str) -> list[dict]:
129129
outputs = list()
130-
module_set = set(module_names)
131130
for output in data["outputs"]:
132-
if "mapOutput" not in output or output["name"] not in module_set:
131+
if "mapOutput" not in output or output["name"] != module_name:
133132
continue
134133
map_output = output["mapOutput"]
135134
for key, items in map_output.items():
@@ -163,43 +162,41 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:
163162

164163
def poll(
165164
self,
166-
output_modules: list[str],
165+
output_module: str,
167166
start_block: int,
168167
end_block: int,
169-
stream_callback: Optional[callable] = None,
170168
return_first_result: bool = False,
171169
initial_snapshot: bool = False,
172170
return_type: str = "df"
173171
):
174172

175-
module_name = ""
176-
has_error = False
177-
return_dict_interface = {"data": [], "module_name": module_name, "data_block": start_block, "error": None}
173+
return_dict_interface = {"data": [], "module_name": output_module, "data_block": str(start_block), "error": None}
178174
valid_return_types = ["dict", "df"]
179175
results = []
180176
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
181177

182178
try:
179+
if isinstance(output_module, str) is False:
180+
raise Exception("The 'output_module' parameter passed into the poll() function is not a string.")
183181
return_type = return_type.lower()
184182
if return_type not in valid_return_types:
185183
return_type = "df"
186184

187185
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
188-
for module in output_modules:
189-
if module not in self.output_modules:
190-
raise Exception(f"module '{module}' is not supported for {self.name}")
191-
if self.output_modules[module].get('is_map') is False:
192-
raise Exception(f"module '{module}' is not a map module")
193-
self._class_from_module(module)
186+
if output_module not in self.output_modules:
187+
raise Exception(f"module '{output_module}' is not supported for {self.name}")
188+
if self.output_modules[output_module].get('is_map') is False:
189+
raise Exception(f"module '{output_module}' is not a map module")
190+
self._class_from_module(output_module)
194191

195192
stream = self.service.Blocks(
196193
Request(
197194
start_block_num=start_block,
198195
stop_block_num=end_block,
199196
fork_steps=[STEP_IRREVERSIBLE],
200197
modules=self.pkg.modules,
201-
output_modules=output_modules,
202-
initial_store_snapshot_for_modules=output_modules
198+
output_modules=[output_module],
199+
initial_store_snapshot_for_modules=[output_module]
203200
if initial_snapshot
204201
else None,
205202
)
@@ -214,43 +211,41 @@ def poll(
214211
continue
215212

216213
if snapshot:
217-
module_name = snapshot["moduleName"]
218214
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
219-
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
215+
raw_results[output_module]["snapshots"].extend(snapshot_deltas)
220216

221217
if data:
222-
parsed = self._parse_data_outputs(data, output_modules)
223-
module_name = data["outputs"][0]["name"]
224-
raw_results[module_name]["data"].extend(parsed)
218+
parsed = self._parse_data_outputs(data, output_module)
219+
raw_results[output_module]["data"].extend(parsed)
225220
return_dict_interface["data_block"] = data["clock"]["number"]
226221
if len(parsed) > 0:
227222
parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed]
228223
if return_first_result is True:
229224
break
230-
if callable(stream_callback):
231-
stream_callback(module_name, parsed)
225+
elif int(return_dict_interface["data_block"]) + 1 == end_block:
226+
results = return_dict_interface
232227

233228
if return_first_result is True and parsed:
234229
return_dict_interface["data"] = parsed
235230
if return_type == "dict":
236231
results = return_dict_interface
237232
if return_type == "df":
238233
results = pd.DataFrame(parsed)
239-
elif raw_results:
240-
result = SubstreamOutput(module_name=output_modules[0])
241-
data_dict: dict = raw_results.get(output_modules[0])
234+
if return_first_result is False and raw_results:
235+
result = SubstreamOutput(module_name=output_module)
236+
data_dict: dict = raw_results.get(output_module)
242237
for k, v in data_dict.items():
243238
df = pd.DataFrame(v)
244-
df["output_module"] = output_modules[0]
239+
df["output_module"] = output_module
245240
setattr(result, k, df)
246241
results.append(result)
247242
if return_type == "dict":
248243
return_dict_interface["data"] = results.to_dict()
249244
results = return_dict_interface
250-
else:
251-
raise Exception("No Valid Data Results Returned by Substream")
252245
except Exception as err:
253-
has_error = True
254-
return_dict_interface["error"] = err
246+
error_to_pass = err
247+
if isinstance(err, Exception):
248+
error_to_pass = str(err)
249+
return_dict_interface["error"] = error_to_pass
255250
results = return_dict_interface
256251
return results

0 commit comments

Comments
 (0)