Skip to content

Commit a5aba3f

Browse files
add pre-request time travel support
1 parent 75a5277 commit a5aba3f

6 files changed

Lines changed: 1034 additions & 851 deletions

File tree

drift/core/communication/communicator.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
MessageType,
2424
MockRequestInput,
2525
MockResponseOutput,
26+
Runtime,
2627
SdkMessage,
2728
SendAlertRequest,
2829
SendInboundSpanForReplayRequest,
30+
SetTimeTravelRequest,
31+
SetTimeTravelResponse,
2932
UnpatchedDependencyAlert,
3033
span_to_proto,
3134
)
@@ -78,6 +81,8 @@ def __init__(self, config: CommunicatorConfig | None = None) -> None:
7881
self._incoming_buffer = bytearray()
7982
self._pending_requests: dict[str, dict[str, Any]] = {}
8083
self._lock = threading.Lock()
84+
self._background_reader_thread: threading.Thread | None = None
85+
self._stop_background_reader = threading.Event()
8186

8287
@property
8388
def is_connected(self) -> bool:
@@ -321,6 +326,9 @@ def connect_sync(
321326
self._connected = True
322327
logger.info(f"[CONNECT_SYNC] Connection successful! Socket is: {self._socket}")
323328
logger.info(f"[CONNECT_SYNC] _connected={self._connected}, is_connected={self.is_connected}")
329+
330+
# Start background reader for CLI-initiated messages (like SetTimeTravel)
331+
self._start_background_reader()
324332
else:
325333
error_msg = response.error or "Unknown error"
326334
raise ConnectionError(f"CLI rejected connection: {error_msg}")
@@ -770,6 +778,12 @@ def _cleanup(self) -> None:
770778
logger.warning("[CLEANUP] _cleanup() called! Stack trace:")
771779
logger.warning("".join(traceback.format_stack()))
772780

781+
# Stop background reader thread
782+
self._stop_background_reader.set()
783+
if self._background_reader_thread and self._background_reader_thread.is_alive():
784+
self._background_reader_thread.join(timeout=1.0)
785+
self._background_reader_thread = None
786+
773787
self._connected = False
774788
self._session_id = None
775789
self._incoming_buffer.clear()
@@ -783,3 +797,116 @@ def _cleanup(self) -> None:
783797
self._socket = None
784798

785799
self._pending_requests.clear()
800+
801+
# ========== Background Reader for CLI-initiated Messages ==========
802+
803+
def _start_background_reader(self) -> None:
804+
"""Start background thread to read CLI-initiated messages."""
805+
if self._background_reader_thread and self._background_reader_thread.is_alive():
806+
return
807+
808+
self._stop_background_reader.clear()
809+
self._background_reader_thread = threading.Thread(
810+
target=self._background_read_loop,
811+
daemon=True,
812+
name="CLI-Message-Reader",
813+
)
814+
self._background_reader_thread.start()
815+
logger.debug("Started background reader thread for CLI-initiated messages")
816+
817+
def _background_read_loop(self) -> None:
818+
"""Background loop to read and handle CLI-initiated messages."""
819+
while not self._stop_background_reader.is_set():
820+
if not self._socket:
821+
break
822+
823+
try:
824+
# Set a short timeout so we can check the stop event periodically
825+
self._socket.settimeout(0.5)
826+
827+
# Try to read length prefix
828+
try:
829+
length_data = self._recv_exact(4)
830+
except socket.timeout:
831+
continue # No data available, check stop event and retry
832+
except Exception:
833+
continue
834+
835+
if not length_data:
836+
continue
837+
838+
length = struct.unpack(">I", length_data)[0]
839+
840+
# Read message data
841+
self._socket.settimeout(5.0) # Longer timeout for message body
842+
message_data = self._recv_exact(length)
843+
if not message_data:
844+
continue
845+
846+
# Parse message
847+
cli_message = CliMessage().parse(message_data)
848+
logger.debug(f"Background reader received message type: {cli_message.type}")
849+
850+
# Handle CLI-initiated messages based on message type
851+
if cli_message.type == MessageType.SET_TIME_TRAVEL:
852+
self._handle_set_time_travel_sync(cli_message)
853+
else:
854+
# Other message types (responses to SDK requests) are handled elsewhere
855+
logger.debug(f"Background reader ignoring message type: {cli_message.type}")
856+
857+
except socket.timeout:
858+
continue # Normal timeout, just retry
859+
except Exception as e:
860+
if not self._stop_background_reader.is_set():
861+
logger.debug(f"Background reader error: {e}")
862+
break
863+
864+
logger.debug("Background reader thread stopped")
865+
866+
def _handle_set_time_travel_sync(self, cli_message: CliMessage) -> None:
867+
"""Handle SetTimeTravel request from CLI and send response."""
868+
request = cli_message.set_time_travel_request
869+
if not request:
870+
return
871+
872+
logger.debug(
873+
f"Received SetTimeTravel request: timestamp={request.timestamp_seconds}, "
874+
f"traceId={request.trace_id}, source={request.timestamp_source}"
875+
)
876+
877+
try:
878+
from drift.instrumentation.datetime.instrumentation import start_time_travel
879+
880+
success = start_time_travel(request.timestamp_seconds, request.trace_id)
881+
882+
response = SetTimeTravelResponse(
883+
success=success,
884+
error="" if success else "time-machine library not available or failed to start",
885+
)
886+
except Exception as e:
887+
logger.error(f"Failed to set time travel: {e}")
888+
response = SetTimeTravelResponse(success=False, error=str(e))
889+
890+
# Send response back to CLI
891+
sdk_message = SdkMessage(
892+
type=MessageType.SET_TIME_TRAVEL,
893+
request_id=cli_message.request_id,
894+
set_time_travel_response=response,
895+
)
896+
897+
try:
898+
self._send_message_sync(sdk_message)
899+
logger.debug(f"Sent SetTimeTravel response: success={response.success}")
900+
except Exception as e:
901+
logger.error(f"Failed to send SetTimeTravel response: {e}")
902+
903+
def _send_message_sync(self, message: SdkMessage) -> None:
904+
"""Send a message synchronously on the main socket."""
905+
if not self._socket:
906+
raise ConnectionError("Not connected to CLI")
907+
908+
message_bytes = bytes(message)
909+
length_prefix = struct.pack(">I", len(message_bytes))
910+
911+
with self._lock:
912+
self._socket.sendall(length_prefix + message_bytes)

drift/core/communication/types.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
"CliMessage",
1717
"InstrumentationVersionMismatchAlert",
1818
"MessageType",
19+
"Runtime",
1920
"SdkMessage",
2021
"SendAlertRequest",
2122
"SendInboundSpanForReplayRequest",
23+
"SetTimeTravelRequest",
24+
"SetTimeTravelResponse",
2225
"UnpatchedDependencyAlert",
2326
# Aliases
2427
"SDKMessageType",
@@ -43,9 +46,12 @@
4346
CliMessage,
4447
InstrumentationVersionMismatchAlert,
4548
MessageType,
49+
Runtime,
4650
SdkMessage,
4751
SendAlertRequest,
4852
SendInboundSpanForReplayRequest,
53+
SetTimeTravelRequest,
54+
SetTimeTravelResponse,
4955
UnpatchedDependencyAlert,
5056
)
5157
from tusk.drift.core.v1 import (
@@ -136,6 +142,9 @@ class ConnectRequest:
136142
metadata: dict[str, str] = field(default_factory=dict)
137143
"""Additional metadata."""
138144

145+
runtime: Runtime = Runtime.PYTHON
146+
"""SDK runtime environment (node, python)."""
147+
139148
def to_proto(self) -> ProtoConnectRequest:
140149
"""Convert to protobuf message."""
141150
from betterproto.lib.google.protobuf import Struct
@@ -150,6 +159,7 @@ def to_proto(self) -> ProtoConnectRequest:
150159
sdk_version=self.sdk_version,
151160
min_cli_version=self.min_cli_version,
152161
metadata=metadata_struct,
162+
runtime=self.runtime,
153163
)
154164

155165

drift/core/drift_sdk.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,6 @@ def initialize(
241241
"Continuing with SDK initialization. Mock responses will not be available until CLI is running."
242242
)
243243

244-
# # TEMPORARY: Hardcoded time travel for testing
245-
# from drift.instrumentation.datetime import start_time_travel
246-
# # Unix timestamp: seconds + nanos/1e9 + 60 seconds
247-
# timestamp = 1768272255.864658
248-
# start_time_travel(timestamp)
249-
# logger.info(f"HARDCODED time travel to unix timestamp: {timestamp}")
250-
251244
install_hooks()
252245

253246
instance._init_auto_instrumentations()
@@ -474,12 +467,12 @@ def _init_auto_instrumentations(self) -> None:
474467
except Exception as e:
475468
logger.debug(f"Socket instrumentation initialization failed: {e}")
476469

477-
try:
478-
from ..instrumentation.kinde import KindeInstrumentation
479-
_ = KindeInstrumentation(enabled=True)
480-
logger.debug("Kinde instrumentation initialized (REPLAY mode - auth token validation)")
481-
except Exception as e:
482-
logger.debug(f"Kinde instrumentation initialization failed: {e}")
470+
# try:
471+
# from ..instrumentation.kinde import KindeInstrumentation
472+
# _ = KindeInstrumentation(enabled=True)
473+
# logger.debug("Kinde instrumentation initialized (REPLAY mode - auth token validation)")
474+
# except Exception as e:
475+
# logger.debug(f"Kinde instrumentation initialization failed: {e}")
483476

484477
def create_env_vars_snapshot(self) -> None:
485478
"""Create a span capturing all environment variables.

drift/core/mock_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ def _update_time_travel(
281281
mock_response: The mock response containing the timestamp
282282
replay_trace_id: The replay trace ID for this session
283283
"""
284+
if not replay_trace_id:
285+
return
286+
284287
try:
285288
from drift.instrumentation.datetime import start_time_travel
286289

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ dependencies = [
2727
"protobuf>=6.0",
2828
"PyYAML>=6.0",
2929
"requests>=2.32.5",
30-
"tusk-drift-schemas>=0.1.9.dev1",
30+
"tusk-drift-schemas @ file:///Users/sohankshirsagar/Desktop/Playground/tusk-drift-container/tusk-drift-schemas",
3131
"aiohttp>=3.9.0",
3232
"aiofiles>=23.0.0",
3333
"opentelemetry-api>=1.20.0",

0 commit comments

Comments
 (0)