Skip to content

Commit 2e7e6d1

Browse files
authored
Merge pull request #3 from messari/generalized-pb2-import
generalized pb2 module path
2 parents b09ebb7 + 08d0a6e commit 2e7e6d1

1 file changed

Lines changed: 15 additions & 10 deletions

File tree

substreams/substream.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22
import base64
3-
import os
3+
import os, sys
44
import subprocess
55
from collections import defaultdict
66
from dataclasses import dataclass
@@ -12,18 +12,19 @@
1212
import pandas as pd
1313
from google.protobuf.descriptor_pb2 import DescriptorProto
1414
from google.protobuf.json_format import MessageToDict
15+
from importlib import import_module
1516

1617
DEFAULT_ENDPOINT = "api.streamingfast.io:443"
1718

1819

1920
def retrieve_class(module_name: str, class_name: str):
20-
module = __import__(module_name)
21+
module = import_module(module_name)
2122
return getattr(module, class_name)
2223

2324

24-
def generate_pb2_files(spkg_path: str, commands: str) -> None:
25+
def generate_pb2_files(spkg_path: str, commands: str, out_path: str) -> None:
2526
command = f"""
26-
alias protogen_py="python3 -m grpc_tools.protoc --descriptor_set_in={spkg_path} --python_out=. --grpc_python_out=.";
27+
alias protogen_py="python3 -m grpc_tools.protoc --descriptor_set_in={spkg_path} --python_out={out_path} --grpc_python_out={out_path}";
2728
{commands}
2829
unalias protogen_py;
2930
"""
@@ -41,22 +42,27 @@ class SubstreamOutput:
4142

4243
class Substream:
4344
def __init__(
44-
self, spkg_path: str, token: Optional[str] = None, regenerate: bool = False
45+
self, spkg_path: str, token: Optional[str] = None, regenerate: bool = False, sf_out_dir: str = '.'
4546
):
4647
self.token: Optional[str] = os.getenv("SUBSTREAMS_API_TOKEN", None) or token
48+
sf_dir_path = os.path.join(sf_out_dir, 'sf')
49+
if not Path(sf_out_dir).exists():
50+
os.makedirs(sf_out_dir)
4751
if not self.token:
4852
raise Exception("Must set SUBSTREAMS_API_TOKEN")
4953
if not Path(spkg_path).exists() or not spkg_path.endswith(".spkg"):
5054
raise Exception("Must provide a valid .spkg file!")
51-
if not Path("sf/substreams").exists() or regenerate:
55+
if not Path(sf_dir_path).exists() or regenerate:
5256
# generate sf/ directory
5357
commands = """
5458
protogen_py sf/substreams/v1/substreams.proto;
5559
protogen_py sf/substreams/v1/package.proto;
5660
protogen_py sf/substreams/v1/modules.proto;
5761
protogen_py sf/substreams/v1/clock.proto;
5862
"""
59-
generate_pb2_files(spkg_path, commands)
63+
generate_pb2_files(spkg_path, commands, out_path=sf_out_dir)
64+
65+
sys.path.append(sf_out_dir)
6066

6167
from sf.substreams.v1.package_pb2 import Package
6268
from sf.substreams.v1.substreams_pb2_grpc import StreamStub
@@ -72,7 +78,7 @@ def __init__(
7278
if not file.startswith("sf/") and not file.startswith("google/")
7379
]
7480
)
75-
generate_pb2_files(spkg_path, custom_proto_files)
81+
generate_pb2_files(spkg_path, custom_proto_files, out_path=sf_out_dir)
7682

7783
credentials = grpc.composite_channel_credentials(
7884
grpc.ssl_channel_credentials(),
@@ -95,8 +101,7 @@ def _class_from_module(self, module_name: str):
95101
raw_module_path: str = self.proto_file_map.get(output_type)
96102
if raw_module_path is None:
97103
return None
98-
module_path: str = raw_module_path.split("/")[-1].split(".proto")[0]
99-
pb2_path: str = f"{module_path}_pb2"
104+
pb2_path: str = raw_module_path.replace('.proto', '_pb2').replace('/', '.')
100105
return retrieve_class(pb2_path, output_type)
101106

102107
def _parse_from_string(self, raw: str, key: str, output_class) -> dict:

0 commit comments

Comments
 (0)