-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathstream_events.py
More file actions
39 lines (30 loc) · 1.28 KB
/
stream_events.py
File metadata and controls
39 lines (30 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
"""
Activity Stream - X API v2
==========================
Endpoint: GET https://api.x.com/2/activity/stream
Docs: https://docs.x.com/x-api/activity/introduction
Opens a persistent HTTP connection and streams real-time activity events
matching your active subscriptions. Events are delivered as they occur on
the platform — no polling required.
You must create at least one subscription (see create_subscription.py) before
events will be delivered to this stream.
Authentication: Bearer Token (App-only)
Required env vars: BEARER_TOKEN
"""
import os
import json
from xdk import Client
bearer_token = os.environ.get("BEARER_TOKEN")
client = Client(bearer_token=bearer_token)
def main():
print("Connecting to activity stream... (press Ctrl+C to stop)")
# The stream() method returns a generator that yields events as they arrive.
# The SDK manages reconnection with exponential backoff automatically.
for event in client.activity.stream():
# Access data attribute (model uses extra='allow' so data should be available)
# Use getattr with fallback in case data field is missing from response
event_data = getattr(event, 'data', None)
if event_data:
print(json.dumps(event_data, indent=4, sort_keys=True))
if __name__ == "__main__":
main()