|
16 | 16 |
|
17 | 17 | DEFAULT_ENDPOINT = "api.streamingfast.io:443" |
18 | 18 |
|
19 | | - |
20 | 19 | def retrieve_class(module_name: str, class_name: str): |
21 | 20 | module = import_module(module_name) |
22 | 21 | return getattr(module, class_name) |
@@ -126,11 +125,10 @@ def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]: |
126 | 125 | for x in snapshot["deltas"].get("deltas", list()) |
127 | 126 | ] |
128 | 127 |
|
129 | | - def _parse_data_outputs(self, data: dict, module_names: list[str]) -> list[dict]: |
| 128 | + def _parse_data_outputs(self, data: dict, module_name: str) -> list[dict]: |
130 | 129 | outputs = list() |
131 | | - module_set = set(module_names) |
132 | 130 | for output in data["outputs"]: |
133 | | - if "mapOutput" not in output or output["name"] not in module_set: |
| 131 | + if "mapOutput" not in output or output["name"] != module_name: |
134 | 132 | continue |
135 | 133 | map_output = output["mapOutput"] |
136 | 134 | for key, items in map_output.items(): |
@@ -164,85 +162,90 @@ def proto_file_map(self) -> dict[str, DescriptorProto]: |
164 | 162 |
|
165 | 163 | def poll( |
166 | 164 | self, |
167 | | - output_modules: list[str], |
| 165 | + output_module: str, |
168 | 166 | start_block: int, |
169 | 167 | end_block: int, |
170 | | - stream_callback: Optional[callable] = None, |
171 | 168 | return_first_result: bool = False, |
172 | 169 | initial_snapshot: bool = False, |
173 | | - highest_processed_block: int = 0, |
174 | | - return_progress: bool = False, |
| 170 | + return_type: str = "df" |
175 | 171 | ): |
176 | | - from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request |
177 | | - for module in output_modules: |
178 | | - if module not in self.output_modules: |
179 | | - raise Exception(f"module '{module}' is not supported for {self.name}") |
180 | | - if self.output_modules[module].get('is_map') is False: |
181 | | - raise Exception(f"module '{module}' is not a map module") |
182 | | - self._class_from_module(module) |
183 | | - |
184 | | - stream = self.service.Blocks( |
185 | | - Request( |
186 | | - start_block_num=start_block, |
187 | | - stop_block_num=end_block, |
188 | | - fork_steps=[STEP_IRREVERSIBLE], |
189 | | - modules=self.pkg.modules, |
190 | | - output_modules=output_modules, |
191 | | - initial_store_snapshot_for_modules=output_modules |
192 | | - if initial_snapshot |
193 | | - else None, |
194 | | - ) |
195 | | - ) |
196 | | - raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) |
| 172 | + |
| 173 | + return_dict_interface = {"data": [], "module_name": output_module, "data_block": str(start_block), "error": None} |
| 174 | + valid_return_types = ["dict", "df"] |
197 | 175 | results = [] |
198 | | - data_block = 0 |
199 | | - module_name = "" |
| 176 | + raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()}) |
200 | 177 |
|
201 | 178 | try: |
| 179 | + if isinstance(output_module, str) is False: |
| 180 | + raise Exception("The 'output_module' parameter passed into the poll() function is not a string.") |
| 181 | + return_type = return_type.lower() |
| 182 | + if return_type not in valid_return_types: |
| 183 | + return_type = "df" |
| 184 | + |
| 185 | + from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request |
| 186 | + if output_module not in self.output_modules: |
| 187 | + raise Exception(f"module '{output_module}' is not supported for {self.name}") |
| 188 | + if self.output_modules[output_module].get('is_map') is False: |
| 189 | + raise Exception(f"module '{output_module}' is not a map module") |
| 190 | + self._class_from_module(output_module) |
| 191 | + |
| 192 | + stream = self.service.Blocks( |
| 193 | + Request( |
| 194 | + start_block_num=start_block, |
| 195 | + stop_block_num=end_block, |
| 196 | + fork_steps=[STEP_IRREVERSIBLE], |
| 197 | + modules=self.pkg.modules, |
| 198 | + output_modules=[output_module], |
| 199 | + initial_store_snapshot_for_modules=[output_module] |
| 200 | + if initial_snapshot |
| 201 | + else None, |
| 202 | + ) |
| 203 | + ) |
| 204 | + |
202 | 205 | for response in stream: |
203 | 206 | snapshot = MessageToDict(response.snapshot_data) |
204 | 207 | data = MessageToDict(response.data) |
205 | | - progress = MessageToDict(response.progress) |
206 | 208 | session = MessageToDict(response.session) |
207 | 209 |
|
208 | 210 | if session: |
209 | 211 | continue |
210 | 212 |
|
211 | 213 | if snapshot: |
212 | | - module_name = snapshot["moduleName"] |
213 | 214 | snapshot_deltas = self._parse_snapshot_deltas(snapshot) |
214 | | - raw_results[module_name]["snapshots"].extend(snapshot_deltas) |
| 215 | + raw_results[output_module]["snapshots"].extend(snapshot_deltas) |
215 | 216 |
|
216 | 217 | if data: |
217 | | - parsed = self._parse_data_outputs(data, output_modules) |
218 | | - module_name = data["outputs"][0]["name"] |
219 | | - raw_results[module_name]["data"].extend(parsed) |
220 | | - data_block = data["clock"]["number"] |
| 218 | + parsed = self._parse_data_outputs(data, output_module) |
| 219 | + raw_results[output_module]["data"].extend(parsed) |
| 220 | + return_dict_interface["data_block"] = data["clock"]["number"] |
221 | 221 | if len(parsed) > 0: |
222 | | - parsed = [dict(item, **{'block':data_block}) for item in parsed] |
| 222 | + parsed = [dict(item, **{'block':data["clock"]["number"]}) for item in parsed] |
223 | 223 | if return_first_result is True: |
224 | 224 | break |
225 | | - if callable(stream_callback): |
226 | | - stream_callback(module_name, parsed) |
227 | | - else: |
228 | | - continue |
229 | | - elif progress and return_progress is True: |
230 | | - if 'processedBytes' in progress["modules"][0] or 'processedRanges' not in progress["modules"][0]: |
231 | | - continue |
232 | | - endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock']) |
233 | | - data_block = endBlock |
234 | | - if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]: |
235 | | - return {"block": int(endBlock)} |
236 | | - if return_first_result is True: |
237 | | - return {"data": parsed, "module_name": module_name, "data_block": data_block} |
238 | | - for output_module in output_modules: |
| 225 | + elif int(return_dict_interface["data_block"]) + 1 == end_block: |
| 226 | + results = return_dict_interface |
| 227 | + |
| 228 | + if return_first_result is True and parsed: |
| 229 | + return_dict_interface["data"] = parsed |
| 230 | + if return_type == "dict": |
| 231 | + results = return_dict_interface |
| 232 | + if return_type == "df": |
| 233 | + results = pd.DataFrame(parsed) |
| 234 | + if return_first_result is False and raw_results: |
239 | 235 | result = SubstreamOutput(module_name=output_module) |
240 | 236 | data_dict: dict = raw_results.get(output_module) |
241 | 237 | for k, v in data_dict.items(): |
242 | 238 | df = pd.DataFrame(v) |
243 | 239 | df["output_module"] = output_module |
244 | 240 | setattr(result, k, df) |
245 | 241 | results.append(result) |
| 242 | + if return_type == "dict": |
| 243 | + return_dict_interface["data"] = results.to_dict() |
| 244 | + results = return_dict_interface |
246 | 245 | except Exception as err: |
247 | | - results = {"error": err} |
| 246 | + error_to_pass = err |
| 247 | + if isinstance(err, Exception): |
| 248 | + error_to_pass = str(err) |
| 249 | + return_dict_interface["error"] = error_to_pass |
| 250 | + results = return_dict_interface |
248 | 251 | return results |
0 commit comments