Skip to content

Commit ac4a163

Browse files
authored
refactor: remove unused async connect path and simplify response handling (#42)
1 parent 5d97f8e commit ac4a163

2 files changed

Lines changed: 6 additions & 157 deletions

File tree

drift/core/communication/communicator.py

Lines changed: 5 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -122,121 +122,6 @@ def _get_stack_trace(self) -> str:
122122

123123
# ========== Connection Methods ==========
124124

125-
async def connect(
126-
self,
127-
connection_info: dict[str, Any] | None = None,
128-
service_id: str = "",
129-
) -> None:
130-
"""Connect to the CLI and perform handshake.
131-
132-
Args:
133-
connection_info: Dict with 'socketPath' or 'host'/'port'
134-
service_id: Service identifier for the connection
135-
136-
Raises:
137-
ConnectionError: If connection fails
138-
TimeoutError: If connection times out
139-
"""
140-
# Determine address
141-
if connection_info:
142-
if "socketPath" in connection_info:
143-
address: tuple[str, int] | str = connection_info["socketPath"]
144-
else:
145-
address = (connection_info["host"], connection_info["port"])
146-
else:
147-
address = self._get_socket_address()
148-
149-
# Set calling_library_context to prevent socket instrumentation from flagging
150-
# our own socket operations as unpatched dependencies
151-
context_token = calling_library_context.set("ProtobufCommunicator")
152-
try:
153-
# Create appropriate socket type
154-
if isinstance(address, str):
155-
# Unix socket
156-
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157-
logger.debug(f"Connecting to Unix socket: {address}")
158-
else:
159-
# TCP socket
160-
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
161-
logger.debug(f"Connecting to TCP: {address}")
162-
163-
self._socket.settimeout(self.config.connect_timeout)
164-
self._socket.connect(address)
165-
166-
conn_type = "Unix socket" if isinstance(address, str) else "TCP"
167-
logger.debug(f"Connected to CLI via protobuf ({conn_type})")
168-
169-
# Send connect message
170-
await self._send_connect_message(service_id)
171-
172-
self._connected = True
173-
174-
except TimeoutError as e:
175-
self._cleanup()
176-
raise TimeoutError(f"Connection timed out: {e}") from e
177-
except OSError as e:
178-
self._cleanup()
179-
raise ConnectionError(f"Socket error: {e}") from e
180-
finally:
181-
calling_library_context.reset(context_token)
182-
183-
async def _send_connect_message(self, service_id: str) -> None:
184-
"""Send the initial connection message to CLI and wait for acknowledgement."""
185-
connect_request = ConnectRequest(
186-
service_id=service_id,
187-
sdk_version=SDK_VERSION,
188-
min_cli_version=MIN_CLI_VERSION,
189-
)
190-
191-
request_id = self._generate_request_id()
192-
sdk_message = SdkMessage(
193-
type=MessageType.SDK_CONNECT,
194-
request_id=request_id,
195-
connect_request=connect_request.to_proto(),
196-
)
197-
198-
await self._send_protobuf_message(sdk_message)
199-
200-
# Wait for connect response from CLI
201-
await self._receive_connect_response(request_id)
202-
203-
async def _receive_connect_response(self, request_id: str) -> None:
204-
"""Wait for and handle the connect response from CLI."""
205-
if not self._socket:
206-
raise ConnectionError("Socket not initialized")
207-
208-
self._socket.settimeout(self.config.connect_timeout)
209-
210-
try:
211-
# Read length prefix
212-
length_data = self._recv_exact(4)
213-
if not length_data:
214-
raise ConnectionError("Connection closed by CLI")
215-
216-
length = struct.unpack(">I", length_data)[0]
217-
218-
# Read message data
219-
message_data = self._recv_exact(length)
220-
if not message_data:
221-
raise ConnectionError("Connection closed by CLI")
222-
223-
cli_message = CliMessage().parse(message_data)
224-
225-
logger.debug(f"Received connect response: type={cli_message.type}, requestId={cli_message.request_id}")
226-
227-
if cli_message.connect_response:
228-
response = cli_message.connect_response
229-
if response.success:
230-
logger.debug("CLI acknowledged connection successfully")
231-
else:
232-
error_msg = response.error or "Unknown error"
233-
raise ConnectionError(f"CLI rejected connection: {error_msg}")
234-
else:
235-
raise ConnectionError(f"Expected connect response but got message type: {cli_message.type}")
236-
237-
except TimeoutError as e:
238-
raise TimeoutError(f"Timeout waiting for connect response: {e}") from e
239-
240125
def connect_sync(
241126
self,
242127
connection_info: dict[str, Any] | None = None,
@@ -343,7 +228,7 @@ def connect_sync(
343228
finally:
344229
calling_library_context.reset(context_token)
345230

346-
async def disconnect(self) -> None:
231+
def disconnect(self) -> None:
347232
"""Disconnect from CLI."""
348233
self._cleanup()
349234
logger.debug("Disconnected from CLI")
@@ -559,51 +444,15 @@ async def _send_protobuf_message(self, message: SdkMessage) -> None:
559444
async def _receive_response(self, request_id: str) -> MockResponseOutput:
560445
"""Receive and parse a response for a specific request ID.
561446
562-
If the background reader is running, waits on an event for the response.
563-
Otherwise, reads directly from the socket (for async-only connections).
447+
Waits on an event for the background reader to deliver the response.
564448
"""
565449
if not self._socket:
566450
raise ConnectionError("Socket not initialized")
567451

568-
# If background reader is running, wait on event instead of reading socket
569-
if self._background_reader_thread and self._background_reader_thread.is_alive():
570-
return await self._wait_for_response_async(request_id)
571-
572-
# No background reader - read directly from socket (async connect path)
573-
self._socket.settimeout(self.config.request_timeout)
574-
575-
try:
576-
while True:
577-
# Read length prefix
578-
length_data = self._recv_exact(4)
579-
if not length_data:
580-
raise ConnectionError("Connection closed by CLI")
452+
if not self._background_reader_thread or not self._background_reader_thread.is_alive():
453+
raise ConnectionError("Background reader is not running - connection may have been closed")
581454

582-
length = struct.unpack(">I", length_data)[0]
583-
584-
# Read message data
585-
message_data = self._recv_exact(length)
586-
if not message_data:
587-
raise ConnectionError("Connection closed by CLI")
588-
589-
cli_message = CliMessage().parse(message_data)
590-
591-
logger.debug(f"Received CLI message type: {cli_message.type}, requestId: {cli_message.request_id}")
592-
593-
if cli_message.request_id == request_id:
594-
return self._handle_cli_message(cli_message)
595-
596-
if cli_message.connect_response:
597-
response = cli_message.connect_response
598-
if response.success:
599-
logger.debug("CLI acknowledged connection")
600-
# Note: session_id is not in the protobuf schema
601-
else:
602-
logger.error(f"CLI rejected connection: {response.error}")
603-
continue
604-
605-
except TimeoutError as e:
606-
raise TimeoutError(f"Request timed out: {e}") from e
455+
return await self._wait_for_response_async(request_id)
607456

608457
def _wait_for_response(self, request_id: str) -> MockResponseOutput:
609458
"""Wait for a response from the background reader thread.

drift/core/drift_sdk.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ def shutdown(self) -> None:
746746

747747
if self.communicator:
748748
try:
749-
asyncio.run(self.communicator.disconnect())
749+
self.communicator.disconnect()
750750
except Exception as e:
751751
logger.error(f"Error disconnecting from CLI: {e}")
752752

0 commit comments

Comments
 (0)