Skip to content

Commit 8b87893

Browse files
authored
Merge pull request #4 from MichaelC1999/Additional-Functions
Poll cb and poll first dict, error handling
2 parents b807c87 + 1bb8f97 commit 8b87893

1 file changed

Lines changed: 78 additions & 28 deletions

File tree

substreams/substream.py

Lines changed: 78 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ def _parse_from_string(self, raw: str, key: str, output_class) -> dict:
108108
decoded: bytes = base64.b64decode(raw)
109109
obj = {}
110110
if output_class is None:
111-
# PASS THE VALUE TYPE HERE TO SANITIZE BASE64 DECODE(bigint OR string)
112111
obj["value"] = str(decoded).split("b'")[1].split("'")[0]
113112
if ":" in key:
114113
split_key = key.split(":")
@@ -179,15 +178,17 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:
179178
name_map[mt.name] = pf.name
180179
return name_map
181180

182-
# TODO how do I type annotate this stuff?
183181
def poll(
184182
self,
185183
output_modules: list[str],
186184
start_block: int,
187185
end_block: int,
186+
stream_callback=None,
187+
return_first_result=False,
188188
initial_snapshot=False,
189+
highest_processed_block: int = 0,
190+
return_progress=False
189191
):
190-
# TODO make this general
191192
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
192193

193194
for module in output_modules:
@@ -208,29 +209,78 @@ def poll(
208209
)
209210
)
210211
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
211-
for response in stream:
212-
snapshot = MessageToDict(response.snapshot_data)
213-
data = MessageToDict(response.data)
214-
if snapshot:
215-
module_name: str = snapshot["moduleName"]
216-
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
217-
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
218-
if data:
219-
print("data block #", data["clock"]["number"])
220-
if self.output_modules[module]["is_map"]:
221-
parsed = self._parse_data_outputs(data)
222-
else:
223-
parsed = self._parse_data_deltas(data)
224-
module_name: str = data["outputs"][0]["name"]
225-
raw_results[module_name]["data"].extend(parsed)
226-
227212
results = []
228-
for output_module in output_modules:
229-
result = SubstreamOutput(module_name=output_module)
230-
data_dict: dict = raw_results.get(output_module)
231-
for k, v in data_dict.items():
232-
df = pd.DataFrame(v)
233-
df["output_module"] = output_module
234-
setattr(result, k, df)
235-
results.append(result)
236-
return results
213+
if callable(stream_callback):
214+
# This logic executes a function passed as a parameter on every block data has been streamed
215+
try:
216+
for response in stream:
217+
data = MessageToDict(response.data)
218+
if data:
219+
module_name: str = data["outputs"][0]["name"]
220+
if self.output_modules[module]["is_map"]:
221+
parsed = self._parse_data_outputs(data)
222+
else:
223+
parsed = self._parse_data_deltas(data)
224+
if len(parsed) > 0:
225+
stream_callback(module_name, parsed)
226+
except Exception as e:
227+
return {"error": e}
228+
return
229+
elif return_first_result is True:
230+
# This logic indexes all blocks until the first block that data is received
231+
# Ideal for live streaming events and receiveing them on the front end
232+
module_name = ""
233+
parsed = None
234+
data_block = 0
235+
try:
236+
for response in stream:
237+
data = MessageToDict(response.data)
238+
progress = MessageToDict(response.progress)
239+
if data:
240+
data_block = data["clock"]["number"]
241+
module_name: str = data["outputs"][0]["name"]
242+
if self.output_modules[module]["is_map"]:
243+
parsed = self._parse_data_outputs(data)
244+
else:
245+
parsed = self._parse_data_deltas(data)
246+
module_name: str = data["outputs"][0]["name"]
247+
if len(parsed) > 0:
248+
break
249+
elif progress and return_progress is True:
250+
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
251+
data_block = endBlock
252+
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
253+
return {"block": int(endBlock)}
254+
return {"data": parsed, "module_name": module_name, "data_block": data_block}
255+
except Exception as e:
256+
return {"error": e}
257+
else:
258+
# This logic indexes all blocks from start_block until end_block
259+
# Returns one single dataframe with all data in rows once all blocks have been indexed
260+
try:
261+
for response in stream:
262+
snapshot = MessageToDict(response.snapshot_data)
263+
data = MessageToDict(response.data)
264+
if snapshot:
265+
module_name: str = snapshot["moduleName"]
266+
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
267+
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
268+
if data:
269+
if self.output_modules[module]["is_map"]:
270+
parsed = self._parse_data_outputs(data)
271+
else:
272+
parsed = self._parse_data_deltas(data)
273+
module_name: str = data["outputs"][0]["name"]
274+
raw_results[module_name]["data"].extend(parsed)
275+
for output_module in output_modules:
276+
result = SubstreamOutput(module_name=output_module)
277+
data_dict: dict = raw_results.get(output_module)
278+
for k, v in data_dict.items():
279+
df = pd.DataFrame(v)
280+
df["output_module"] = output_module
281+
setattr(result, k, df)
282+
results.append(result)
283+
except Exception as e:
284+
results.append({"error": e})
285+
return results
286+

0 commit comments

Comments
 (0)