-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy path5_StreamOracleList.py
More file actions
41 lines (27 loc) · 1.02 KB
/
Copy path5_StreamOracleList.py
File metadata and controls
41 lines (27 loc) · 1.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
from typing import Any, Dict
from grpc import RpcError
from pyinjective.core.network import Network
from pyinjective.indexer_client import IndexerClient
async def oracle_list_event_processor(event: Dict[str, Any]):
print(event)
def stream_error_processor(exception: RpcError):
print(f"There was an error listening to oracle list updates ({exception})")
def stream_closed_processor():
print("The oracle list updates stream has been closed")
async def main() -> None:
network = Network.testnet()
client = IndexerClient(network)
task = asyncio.get_event_loop().create_task(
client.listen_oracle_list_updates(
callback=oracle_list_event_processor,
on_end_callback=stream_closed_processor,
on_status_callback=stream_error_processor,
oracle_type="provider",
symbols=["TIA"],
)
)
await asyncio.sleep(delay=60)
task.cancel()
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())