Skip to content

Commit 79a1216

Browse files
authored
Merge pull request #5 from MichaelC1999/Refactor
Refactor and condense poll function
2 parents 8b87893 + 667576e commit 79a1216

3 files changed

Lines changed: 42 additions & 74 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "substreams"
7-
version = "0.0.5"
7+
version = "0.0.6"
88
authors = [
99
{ name="Ryan Sudhakaran", email="ryan.sudhakaran@messari.io" },
1010
{ name="Michael Carroll", email="michaelcarroll1999@gmail.com" },

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
setup(
77
name="substreams",
8-
version="0.0.5",
8+
version="0.0.6",
99
packages=[".substreams"],
1010
author="Ryan Sudhakaran",
1111
author_email="ryan.sudhakaran@messari.io",

substreams/substream.py

Lines changed: 40 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -210,77 +210,45 @@ def poll(
210210
)
211211
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
212212
results = []
213-
if callable(stream_callback):
214-
# This logic executes a function passed as a parameter on every block data has been streamed
215-
try:
216-
for response in stream:
217-
data = MessageToDict(response.data)
218-
if data:
219-
module_name: str = data["outputs"][0]["name"]
220-
if self.output_modules[module]["is_map"]:
221-
parsed = self._parse_data_outputs(data)
222-
else:
223-
parsed = self._parse_data_deltas(data)
224-
if len(parsed) > 0:
225-
stream_callback(module_name, parsed)
226-
except Exception as e:
227-
return {"error": e}
228-
return
229-
elif return_first_result is True:
230-
# This logic indexes all blocks until the first block that data is received
231-
# Ideal for live streaming events and receiveing them on the front end
232-
module_name = ""
233-
parsed = None
234-
data_block = 0
235-
try:
236-
for response in stream:
237-
data = MessageToDict(response.data)
238-
progress = MessageToDict(response.progress)
239-
if data:
240-
data_block = data["clock"]["number"]
241-
module_name: str = data["outputs"][0]["name"]
242-
if self.output_modules[module]["is_map"]:
243-
parsed = self._parse_data_outputs(data)
244-
else:
245-
parsed = self._parse_data_deltas(data)
246-
module_name: str = data["outputs"][0]["name"]
247-
if len(parsed) > 0:
213+
data_block = 0
214+
module_name: str = ""
215+
try:
216+
for response in stream:
217+
snapshot = MessageToDict(response.snapshot_data)
218+
data = MessageToDict(response.data)
219+
progress = MessageToDict(response.progress)
220+
if snapshot:
221+
module_name = snapshot["moduleName"]
222+
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
223+
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
224+
if data:
225+
if self.output_modules[module]["is_map"]:
226+
parsed = self._parse_data_outputs(data)
227+
else:
228+
parsed = self._parse_data_deltas(data)
229+
module_name = data["outputs"][0]["name"]
230+
raw_results[module_name]["data"].extend(parsed)
231+
data_block = data["clock"]["number"]
232+
if len(parsed) > 0:
233+
if return_first_result is True:
248234
break
249-
elif progress and return_progress is True:
250-
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
251-
data_block = endBlock
252-
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
253-
return {"block": int(endBlock)}
235+
if callable(stream_callback):
236+
stream_callback(module_name, parsed)
237+
elif progress and return_progress is True:
238+
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
239+
data_block = endBlock
240+
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
241+
return {"block": int(endBlock)}
242+
if return_first_result is True:
254243
return {"data": parsed, "module_name": module_name, "data_block": data_block}
255-
except Exception as e:
256-
return {"error": e}
257-
else:
258-
# This logic indexes all blocks from start_block until end_block
259-
# Returns one single dataframe with all data in rows once all blocks have been indexed
260-
try:
261-
for response in stream:
262-
snapshot = MessageToDict(response.snapshot_data)
263-
data = MessageToDict(response.data)
264-
if snapshot:
265-
module_name: str = snapshot["moduleName"]
266-
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
267-
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
268-
if data:
269-
if self.output_modules[module]["is_map"]:
270-
parsed = self._parse_data_outputs(data)
271-
else:
272-
parsed = self._parse_data_deltas(data)
273-
module_name: str = data["outputs"][0]["name"]
274-
raw_results[module_name]["data"].extend(parsed)
275-
for output_module in output_modules:
276-
result = SubstreamOutput(module_name=output_module)
277-
data_dict: dict = raw_results.get(output_module)
278-
for k, v in data_dict.items():
279-
df = pd.DataFrame(v)
280-
df["output_module"] = output_module
281-
setattr(result, k, df)
282-
results.append(result)
283-
except Exception as e:
284-
results.append({"error": e})
285-
return results
286-
244+
for output_module in output_modules:
245+
result = SubstreamOutput(module_name=output_module)
246+
data_dict: dict = raw_results.get(output_module)
247+
for k, v in data_dict.items():
248+
df = pd.DataFrame(v)
249+
df["output_module"] = output_module
250+
setattr(result, k, df)
251+
results.append(result)
252+
except Exception as e:
253+
results.append({"error": e})
254+
return results

0 commit comments

Comments
 (0)