Skip to content

Commit d5d5276

Browse files
committed
add changes to go & py clients
1 parent 8275f14 commit d5d5276

3 files changed

Lines changed: 118 additions & 10 deletions

File tree

go/modcdp/client/ModCDPClient.go

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const DefaultServiceWorkerReadyTimeoutMS = 60_000
5454
const DefaultServiceWorkerPollIntervalMS = 100
5555
const DefaultTargetSessionPollIntervalMS = 20
5656
const DefaultWSConnectErrorSettleTimeoutMS = 250
57+
const DefaultClientHeartbeatIntervalMS = 250
5758

5859
func boolPointer(value bool) *bool {
5960
return &value
@@ -180,14 +181,16 @@ func freePort() (int, error) {
180181
// --- public types --------------------------------------------------------
181182

182183
type ServerConfig struct {
183-
ServerLoopbackCDPURL string `json:"server_loopback_cdp_url,omitempty"`
184-
ServerRoutes map[string]string `json:"server_routes,omitempty"`
185-
ServerBrowserToken string `json:"server_browser_token,omitempty"`
186-
ServerCDPSendTimeoutMS int `json:"server_cdp_send_timeout_ms,omitempty"`
187-
ServerLoopbackExecutionContextTimeoutMS int `json:"server_loopback_execution_context_timeout_ms,omitempty"`
188-
ServerWSConnectErrorSettleTimeoutMS int `json:"server_ws_connect_error_settle_timeout_ms,omitempty"`
189-
Options map[string]any `json:"-"`
190-
disabled bool
184+
ServerLoopbackCDPURL string `json:"server_loopback_cdp_url,omitempty"`
185+
ServerRoutes map[string]string `json:"server_routes,omitempty"`
186+
ServerBrowserToken string `json:"server_browser_token,omitempty"`
187+
ServerCDPSendTimeoutMS int `json:"server_cdp_send_timeout_ms,omitempty"`
188+
ServerLoopbackExecutionContextTimeoutMS int `json:"server_loopback_execution_context_timeout_ms,omitempty"`
189+
ServerWSConnectErrorSettleTimeoutMS int `json:"server_ws_connect_error_settle_timeout_ms,omitempty"`
190+
ServerDownstreamClientTimeoutMS int `json:"server_downstream_client_timeout_ms,omitempty"`
191+
ServerCloseBrowserOnDownstreamDisconnect *bool `json:"server_close_browser_on_downstream_disconnect,omitempty"`
192+
Options map[string]any `json:"-"`
193+
disabled bool
191194
}
192195

193196
var ServerNone = &ServerConfig{disabled: true}
@@ -254,6 +257,7 @@ type ClientConfig struct {
254257
ClientMirrorUpstreamEvents *bool `json:"client_mirror_upstream_events,omitempty"`
255258
ClientCDPSendTimeoutMS int `json:"client_cdp_send_timeout_ms,omitempty"`
256259
ClientEventWaitTimeoutMS int `json:"client_event_wait_timeout_ms,omitempty"`
260+
ClientHeartbeatIntervalMS int `json:"client_heartbeat_interval_ms,omitempty"`
257261
}
258262

259263
type Options struct {
@@ -406,6 +410,7 @@ type ModCDPClient struct {
406410
launchedBrowser *LaunchedBrowser
407411
extensionInjectors []extensionInjector
408412
configuredPeerGeneration int64
413+
heartbeatStop chan struct{}
409414
}
410415

411416
type extensionInjector interface {
@@ -501,6 +506,9 @@ func New(opts Options) *ModCDPClient {
501506
if opts.Client.ClientEventWaitTimeoutMS == 0 {
502507
opts.Client.ClientEventWaitTimeoutMS = DefaultEventWaitTimeoutMS
503508
}
509+
if opts.Client.ClientHeartbeatIntervalMS == 0 {
510+
opts.Client.ClientHeartbeatIntervalMS = DefaultClientHeartbeatIntervalMS
511+
}
504512
if opts.Injector.InjectorExecutionContextTimeoutMS == 0 {
505513
opts.Injector.InjectorExecutionContextTimeoutMS = DefaultExecutionContextTimeoutMS
506514
}
@@ -575,7 +583,10 @@ func (c *ModCDPClient) Connect() error {
575583
return fmt.Errorf("upstream transport did not connect")
576584
}
577585
c.transport.OnRecv(func(message map[string]any) { c.handleMessage(message) })
578-
c.transport.OnClose(func(err error) { c.rejectAll(err) })
586+
c.transport.OnClose(func(err error) {
587+
c.stopHeartbeat()
588+
c.rejectAll(err)
589+
})
579590
if transportpkg.EndpointKindForUpstream(c.Upstream.UpstreamMode) == UpstreamEndpointKindModCDPServer {
580591
if err := c.transport.WaitForPeer(); err != nil {
581592
c.Close()
@@ -588,6 +599,7 @@ func (c *ModCDPClient) Connect() error {
588599
}
589600
c.configuredPeerGeneration = c.transport.PeerGeneration()
590601
}
602+
c.startHeartbeat()
591603
c.startPingLatencyMeasurement()
592604
connectedAt := time.Now().UnixMilli()
593605
c.ConnectTiming = map[string]any{
@@ -683,6 +695,7 @@ func (c *ModCDPClient) Connect() error {
683695
return fmt.Errorf("Mod.configure: %w", err)
684696
}
685697
}
698+
c.startHeartbeat()
686699
c.startPingLatencyMeasurement()
687700
connectedAt := time.Now().UnixMilli()
688701
c.ConnectTiming = map[string]any{
@@ -899,6 +912,7 @@ func (c *ModCDPClient) serverConfigureParams(customCommands []map[string]any, cu
899912
"server_cdp_send_timeout_ms": c.Client.ClientCDPSendTimeoutMS,
900913
"server_loopback_execution_context_timeout_ms": c.Injector.InjectorExecutionContextTimeoutMS,
901914
"server_ws_connect_error_settle_timeout_ms": c.Upstream.UpstreamWSConnectErrorSettleTimeoutMS,
915+
"server_downstream_client_timeout_ms": maxInt(c.Client.ClientHeartbeatIntervalMS*4, 1_000),
902916
}
903917
if c.Server != nil {
904918
server["server_loopback_cdp_url"] = c.Server.ServerLoopbackCDPURL
@@ -915,6 +929,12 @@ func (c *ModCDPClient) serverConfigureParams(customCommands []map[string]any, cu
915929
if c.Server.ServerWSConnectErrorSettleTimeoutMS != 0 {
916930
server["server_ws_connect_error_settle_timeout_ms"] = c.Server.ServerWSConnectErrorSettleTimeoutMS
917931
}
932+
if c.Server.ServerDownstreamClientTimeoutMS != 0 {
933+
server["server_downstream_client_timeout_ms"] = c.Server.ServerDownstreamClientTimeoutMS
934+
}
935+
if c.Server.ServerCloseBrowserOnDownstreamDisconnect != nil {
936+
server["server_close_browser_on_downstream_disconnect"] = *c.Server.ServerCloseBrowserOnDownstreamDisconnect
937+
}
918938
for key, value := range c.Server.Options {
919939
server[key] = value
920940
}
@@ -1410,6 +1430,7 @@ func handlerPointer(handler Handler) uintptr {
14101430
}
14111431

14121432
func (c *ModCDPClient) Close() {
1433+
c.stopHeartbeat()
14131434
if c.launchedBrowser != nil {
14141435
c.launchedBrowser.Close()
14151436
c.launchedBrowser = nil
@@ -1664,6 +1685,48 @@ func (c *ModCDPClient) startPingLatencyMeasurement() {
16641685
}()
16651686
}
16661687

1688+
func (c *ModCDPClient) startHeartbeat() {
1689+
c.stopHeartbeat()
1690+
if c.Server == nil || c.Server.ServerCloseBrowserOnDownstreamDisconnect == nil || !*c.Server.ServerCloseBrowserOnDownstreamDisconnect {
1691+
return
1692+
}
1693+
interval := c.Client.ClientHeartbeatIntervalMS
1694+
if interval <= 0 {
1695+
return
1696+
}
1697+
stop := make(chan struct{})
1698+
c.heartbeatStop = stop
1699+
go func() {
1700+
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
1701+
defer ticker.Stop()
1702+
for {
1703+
select {
1704+
case <-ticker.C:
1705+
if _, err := c.Send("Mod.ping", map[string]any{"sent_at": time.Now().UnixMilli()}); err != nil {
1706+
return
1707+
}
1708+
case <-stop:
1709+
return
1710+
}
1711+
}
1712+
}()
1713+
}
1714+
1715+
func (c *ModCDPClient) stopHeartbeat() {
1716+
if c.heartbeatStop == nil {
1717+
return
1718+
}
1719+
close(c.heartbeatStop)
1720+
c.heartbeatStop = nil
1721+
}
1722+
1723+
func maxInt(left int, right int) int {
1724+
if left > right {
1725+
return left
1726+
}
1727+
return right
1728+
}
1729+
16671730
func numberAsInt64(value any) (int64, bool) {
16681731
switch v := value.(type) {
16691732
case int64:

python/modcdp/client/ModCDPClient.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ def ping(self, **params: Any) -> AwaitableDict | AwaitableValue:
187187
DEFAULT_SERVICE_WORKER_POLL_INTERVAL_MS = 100
188188
DEFAULT_TARGET_SESSION_POLL_INTERVAL_MS = 20
189189
DEFAULT_WS_CONNECT_ERROR_SETTLE_TIMEOUT_MS = 250
190+
DEFAULT_CLIENT_HEARTBEAT_INTERVAL_MS = 250
190191

191192

192193
class _RawCDP:
@@ -313,6 +314,9 @@ def __init__(
313314
"client_mirror_upstream_events": bool(client_input.get("client_mirror_upstream_events", True)),
314315
"client_cdp_send_timeout_ms": int(_defaulted(client_input.get("client_cdp_send_timeout_ms"), DEFAULT_CDP_SEND_TIMEOUT_MS)),
315316
"client_event_wait_timeout_ms": int(_defaulted(client_input.get("client_event_wait_timeout_ms"), DEFAULT_EVENT_WAIT_TIMEOUT_MS)),
317+
"client_heartbeat_interval_ms": int(
318+
_defaulted(client_input.get("client_heartbeat_interval_ms"), DEFAULT_CLIENT_HEARTBEAT_INTERVAL_MS)
319+
),
316320
}
317321
self.cdp_url: str | None = cast(str | None, self.upstream.get("upstream_cdp_url"))
318322
if server is DEFAULT_SERVER:
@@ -362,6 +366,8 @@ def __init__(
362366
self._closed = False
363367
self._launched_browser: Any | None = None
364368
self._extension_injectors: list[ExtensionInjector] = []
369+
self._heartbeat_stop: threading.Event | None = None
370+
self._heartbeat_thread: threading.Thread | None = None
365371
self._cdp = _RawCDP(self)
366372
self._hydrate_native_protocol_schemas()
367373
self._hydrate_custom_surface()
@@ -374,13 +380,14 @@ def connect(self) -> "ModCDPClient":
374380
if self.transport is None:
375381
raise RuntimeError("upstream transport did not connect.")
376382
self.transport.onRecv(lambda message: self._on_recv(cast(CdpMessage, message)))
377-
self.transport.onClose(lambda error: self._reject_all(error))
383+
self.transport.onClose(lambda error: self._handle_transport_close(error))
378384

379385
if self.upstream_endpoint_kind == "modcdp_server":
380386
self.transport.waitForPeer()
381387
if self.server is not None:
382388
self._send_message("Mod.configure", cast(ProtocolParams, self._server_configure_params()))
383389
threading.Thread(target=self._measure_ping_latency, daemon=True).start()
390+
self._start_heartbeat()
384391
connected_at = int(time.time() * 1000)
385392
self.connect_timing = cast(ModCDPConnectTiming, {
386393
"started_at": connect_started_at,
@@ -420,6 +427,7 @@ def connect(self) -> "ModCDPClient":
420427
routes=cast(ModCDPRoutes, self.client["client_routes"]),
421428
cdp_session_id=self.ext_session_id,
422429
))
430+
self._start_heartbeat()
423431
threading.Thread(target=self._measure_ping_latency, daemon=True).start()
424432
connected_at = int(time.time() * 1000)
425433
self.connect_timing = cast(ModCDPConnectTiming, {
@@ -617,6 +625,10 @@ def _server_configure_params(self) -> ModCDPServerConfig:
617625
"server_ws_connect_error_settle_timeout_ms",
618626
self.upstream["upstream_ws_connect_error_settle_timeout_ms"],
619627
)
628+
server_downstream_client_timeout_ms = server.pop(
629+
"server_downstream_client_timeout_ms",
630+
max(int(self.client["client_heartbeat_interval_ms"]) * 4, 1_000),
631+
)
620632
custom_events: list[ModCDPAddCustomEventObjectParams] = []
621633
for event in self.custom_events:
622634
custom_events.append(
@@ -645,6 +657,7 @@ def _server_configure_params(self) -> ModCDPServerConfig:
645657
"server_cdp_send_timeout_ms": server_cdp_send_timeout_ms,
646658
"server_loopback_execution_context_timeout_ms": server_loopback_execution_context_timeout_ms,
647659
"server_ws_connect_error_settle_timeout_ms": server_ws_connect_error_settle_timeout_ms,
660+
"server_downstream_client_timeout_ms": server_downstream_client_timeout_ms,
648661
**({"server_routes": server_routes} if server_routes is not None else {}),
649662
**({"server_loopback_cdp_url": server_loopback_cdp_url} if server_loopback_cdp_url is not None else {}),
650663
**({"server_browser_token": server_browser_token} if server_browser_token is not None else {}),
@@ -659,6 +672,7 @@ def close(self) -> None:
659672
if self._closed:
660673
return
661674
self._closed = True
675+
self._stop_heartbeat()
662676
if self._launched_browser is not None:
663677
self._launched_browser["close"]()
664678
self._launched_browser = None
@@ -675,6 +689,35 @@ def close(self) -> None:
675689
pass
676690
self._extension_injectors = []
677691

692+
def _handle_transport_close(self, error: Exception) -> None:
693+
self._stop_heartbeat()
694+
self._reject_all(error)
695+
696+
def _start_heartbeat(self) -> None:
697+
self._stop_heartbeat()
698+
if not self.server or self.server.get("server_close_browser_on_downstream_disconnect") is not True:
699+
return
700+
interval_ms = int(self.client["client_heartbeat_interval_ms"])
701+
stop = threading.Event()
702+
self._heartbeat_stop = stop
703+
704+
def run() -> None:
705+
while not stop.wait(interval_ms / 1000):
706+
try:
707+
self.send("Mod.ping", {"sent_at": int(time.time() * 1000)})
708+
except Exception:
709+
return
710+
711+
self._heartbeat_thread = threading.Thread(target=run, daemon=True)
712+
self._heartbeat_thread.start()
713+
714+
def _stop_heartbeat(self) -> None:
715+
stop = self._heartbeat_stop
716+
self._heartbeat_stop = None
717+
if stop is not None:
718+
stop.set()
719+
self._heartbeat_thread = None
720+
678721
def _session_id_for_target(self, target_id: str, timeout: float = 0) -> str | None:
679722
if timeout <= 0:
680723
return self.auto_sessions.sessionIdForTarget(target_id)

python/modcdp/types/modcdp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ class ModCDPServerConfig(TypedDict, total=False):
9292
server_cdp_send_timeout_ms: int
9393
server_loopback_execution_context_timeout_ms: int
9494
server_ws_connect_error_settle_timeout_ms: int
95+
server_downstream_client_timeout_ms: int
96+
server_close_browser_on_downstream_disconnect: bool
9597
server_browser_token: str | None
9698
custom_commands: list[ModCDPAddCustomCommandParams]
9799
custom_events: list[ModCDPAddCustomEventObjectParams]

0 commit comments

Comments
 (0)