diff --git a/packages/client-proxy/internal/cfg/model_test.go b/packages/client-proxy/internal/cfg/model_test.go new file mode 100644 index 0000000000..38d61d4fa4 --- /dev/null +++ b/packages/client-proxy/internal/cfg/model_test.go @@ -0,0 +1,70 @@ +package cfg + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParse_Defaults(t *testing.T) { + t.Setenv("HEALTH_PORT", "") + t.Setenv("PROXY_PORT", "") + t.Setenv("REDIS_URL", "") + t.Setenv("REDIS_CLUSTER_URL", "") + t.Setenv("REDIS_TLS_CA_BASE64", "") + t.Setenv("REDIS_POOL_SIZE", "") + t.Setenv("API_INTERNAL_GRPC_ADDRESS", "") + t.Setenv("API_EDGE_GRPC_ADDRESS", "") + t.Setenv("API_EDGE_GRPC_OAUTH_CLIENT_ID", "") + t.Setenv("API_EDGE_GRPC_OAUTH_CLIENT_SECRET", "") + t.Setenv("API_EDGE_GRPC_OAUTH_TOKEN_URL", "") + + cfg, err := Parse() + require.NoError(t, err) + require.EqualValues(t, 3003, cfg.HealthPort) + require.EqualValues(t, 3002, cfg.ProxyPort) + require.Equal(t, 40, cfg.RedisPoolSize) + require.Empty(t, cfg.RedisURL) + require.Empty(t, cfg.RedisClusterURL) + require.Empty(t, cfg.RedisTLSCABase64) + require.Empty(t, cfg.APIInternalGRPCAddress) + require.Empty(t, cfg.APIEdgeGRPCAddress) + require.Empty(t, cfg.APIEdgeGRPCOAuthClientID) + require.Empty(t, cfg.APIEdgeGRPCOAuthClientSecret) + require.Empty(t, cfg.APIEdgeGRPCOAuthTokenURL) +} + +func TestParse_OverridesFromEnv(t *testing.T) { + t.Setenv("HEALTH_PORT", "9001") + t.Setenv("PROXY_PORT", "9002") + t.Setenv("REDIS_URL", "redis://localhost:6379") + t.Setenv("REDIS_CLUSTER_URL", "redis://cluster:6379") + t.Setenv("REDIS_TLS_CA_BASE64", "Y2EtZGF0YQ==") + t.Setenv("REDIS_POOL_SIZE", "12") + t.Setenv("API_INTERNAL_GRPC_ADDRESS", "internal:5005") + t.Setenv("API_EDGE_GRPC_ADDRESS", "edge:5006") + t.Setenv("API_EDGE_GRPC_OAUTH_CLIENT_ID", "client-id") + t.Setenv("API_EDGE_GRPC_OAUTH_CLIENT_SECRET", "client-secret") + t.Setenv("API_EDGE_GRPC_OAUTH_TOKEN_URL", "https://tokens.example.com") + + cfg, err := Parse() + require.NoError(t, err) + require.EqualValues(t, 9001, cfg.HealthPort) + require.EqualValues(t, 9002, cfg.ProxyPort) + require.Equal(t, "redis://localhost:6379", cfg.RedisURL) + require.Equal(t, "redis://cluster:6379", cfg.RedisClusterURL) + require.Equal(t, "Y2EtZGF0YQ==", cfg.RedisTLSCABase64) + require.Equal(t, 12, cfg.RedisPoolSize) + require.Equal(t, "internal:5005", cfg.APIInternalGRPCAddress) + require.Equal(t, "edge:5006", cfg.APIEdgeGRPCAddress) + require.Equal(t, "client-id", cfg.APIEdgeGRPCOAuthClientID) + require.Equal(t, "client-secret", cfg.APIEdgeGRPCOAuthClientSecret) + require.Equal(t, "https://tokens.example.com", cfg.APIEdgeGRPCOAuthTokenURL) +} + +func TestParse_InvalidIntegerReturnsError(t *testing.T) { + t.Setenv("HEALTH_PORT", "not-a-number") + + _, err := Parse() + require.Error(t, err) +} diff --git a/packages/client-proxy/internal/info_test.go b/packages/client-proxy/internal/info_test.go new file mode 100644 index 0000000000..adefd6b47d --- /dev/null +++ b/packages/client-proxy/internal/info_test.go @@ -0,0 +1,66 @@ +package internal + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestServiceInfo_DefaultStatusIsZeroValue(t *testing.T) { + t.Parallel() + + s := &ServiceInfo{} + require.Equal(t, ServiceHealth(""), s.GetStatus()) +} + +func TestServiceInfo_SetAndGetStatus(t *testing.T) { + t.Parallel() + + s := &ServiceInfo{} + ctx := t.Context() + + s.SetStatus(ctx, Healthy) + require.Equal(t, Healthy, s.GetStatus()) + + s.SetStatus(ctx, Draining) + require.Equal(t, Draining, s.GetStatus()) + + s.SetStatus(ctx, Unhealthy) + require.Equal(t, Unhealthy, s.GetStatus()) +} + +func TestServiceInfo_SetSameStatusIsIdempotent(t *testing.T) { + t.Parallel() + + s := &ServiceInfo{} + ctx := t.Context() + + s.SetStatus(ctx, Healthy) + s.SetStatus(ctx, Healthy) + require.Equal(t, Healthy, s.GetStatus()) +} + +func TestServiceInfo_ConcurrentAccess(t *testing.T) { + t.Parallel() + + s := &ServiceInfo{} + ctx := t.Context() + statuses := []ServiceHealth{Healthy, Draining, Unhealthy} + + var wg sync.WaitGroup + for i := range 50 { + wg.Add(2) + go func(idx int) { + defer wg.Done() + s.SetStatus(ctx, statuses[idx%len(statuses)]) + }(i) + go func() { + defer wg.Done() + _ = s.GetStatus() + }() + } + wg.Wait() + + require.Contains(t, statuses, s.GetStatus()) +} diff --git a/packages/client-proxy/internal/proxy/paused_sandbox_resumer_grpc_test.go b/packages/client-proxy/internal/proxy/paused_sandbox_resumer_grpc_test.go new file mode 100644 index 0000000000..4af4720c34 --- /dev/null +++ b/packages/client-proxy/internal/proxy/paused_sandbox_resumer_grpc_test.go @@ -0,0 +1,245 @@ +package proxy + +import ( + "context" + "errors" + "net" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + proxygrpc "github.com/e2b-dev/infra/packages/shared/pkg/grpc/proxy" +) + +type fakeSandboxServer struct { + proxygrpc.UnimplementedSandboxServiceServer + + resp *proxygrpc.SandboxResumeResponse + err error + + gotSandboxID string + gotMetadata metadata.MD + calls atomic.Int32 +} + +func (f *fakeSandboxServer) ResumeSandbox(ctx context.Context, req *proxygrpc.SandboxResumeRequest) (*proxygrpc.SandboxResumeResponse, error) { + f.calls.Add(1) + f.gotSandboxID = req.GetSandboxId() + if md, ok := metadata.FromIncomingContext(ctx); ok { + f.gotMetadata = md + } + + if f.err != nil { + return nil, f.err + } + + return f.resp, nil +} + +func startFakeServer(t *testing.T, srv proxygrpc.SandboxServiceServer) *grpc.ClientConn { + t.Helper() + + listener := bufconn.Listen(1024 * 1024) + server := grpc.NewServer() + proxygrpc.RegisterSandboxServiceServer(server, srv) + + go func() { + _ = server.Serve(listener) + }() + t.Cleanup(server.Stop) + + conn, err := grpc.NewClient( + "passthrough:///bufnet", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return listener.DialContext(ctx) + }), + ) + require.NoError(t, err) + t.Cleanup(func() { + _ = conn.Close() + }) + + return conn +} + +func TestNewGRPCPausedSandboxResumer_EmptyAddressErrors(t *testing.T) { + t.Parallel() + + r, err := NewGRPCPausedSandboxResumer(t.Context(), " ", GRPCOAuthConfig{}, false) + require.Error(t, err) + require.Nil(t, r) +} + +func TestNewGRPCPausedSandboxResumer_OAuthMisconfiguredErrors(t *testing.T) { + t.Parallel() + + r, err := NewGRPCPausedSandboxResumer(t.Context(), "127.0.0.1:1234", GRPCOAuthConfig{ClientID: "only-id"}, false) + require.Error(t, err) + require.Nil(t, r) +} + +func TestNewGRPCPausedSandboxResumer_InsecureSucceeds(t *testing.T) { + t.Parallel() + + r, err := NewGRPCPausedSandboxResumer(t.Context(), "127.0.0.1:1234", GRPCOAuthConfig{}, false) + require.NoError(t, err) + require.NotNil(t, r) + + closer, ok := r.(interface { + Close(ctx context.Context) error + }) + require.True(t, ok) + require.NoError(t, closer.Close(t.Context())) +} + +func TestNewGRPCPausedSandboxResumer_TLSSucceeds(t *testing.T) { + t.Parallel() + + r, err := NewGRPCPausedSandboxResumer(t.Context(), "127.0.0.1:1234", GRPCOAuthConfig{}, true) + require.NoError(t, err) + require.NotNil(t, r) + + closer, ok := r.(interface { + Close(ctx context.Context) error + }) + require.True(t, ok) + require.NoError(t, closer.Close(t.Context())) +} + +func TestGRPCPausedSandboxResumer_ResumeSendsMetadataAndReturnsIP(t *testing.T) { + t.Parallel() + + srv := &fakeSandboxServer{ + resp: &proxygrpc.SandboxResumeResponse{OrchestratorIp: " 10.0.0.5 "}, + } + conn := startFakeServer(t, srv) + + r := &grpcPausedSandboxResumer{ + conn: conn, + client: proxygrpc.NewSandboxServiceClient(conn), + auth: noopGrpcResumeAuth{}, + } + + ip, err := r.Resume(t.Context(), "sbx-123", 49983, "traffic-token", "envd-token") + require.NoError(t, err) + require.Equal(t, "10.0.0.5", ip) + require.EqualValues(t, 1, srv.calls.Load()) + require.Equal(t, "sbx-123", srv.gotSandboxID) + require.Equal(t, []string{"49983"}, srv.gotMetadata.Get(proxygrpc.MetadataSandboxRequestPort)) + require.Equal(t, []string{"traffic-token"}, srv.gotMetadata.Get(proxygrpc.MetadataTrafficAccessToken)) + require.Equal(t, []string{"envd-token"}, srv.gotMetadata.Get(proxygrpc.MetadataEnvdAccessToken)) +} + +func TestGRPCPausedSandboxResumer_ResumeOmitsEmptyTokens(t *testing.T) { + t.Parallel() + + srv := &fakeSandboxServer{ + resp: &proxygrpc.SandboxResumeResponse{OrchestratorIp: "10.0.0.6"}, + } + conn := startFakeServer(t, srv) + + r := &grpcPausedSandboxResumer{ + conn: conn, + client: proxygrpc.NewSandboxServiceClient(conn), + auth: noopGrpcResumeAuth{}, + } + + ip, err := r.Resume(t.Context(), "sbx-456", 8080, "", "") + require.NoError(t, err) + require.Equal(t, "10.0.0.6", ip) + require.EqualValues(t, 1, srv.calls.Load()) + require.Empty(t, srv.gotMetadata.Get(proxygrpc.MetadataTrafficAccessToken)) + require.Empty(t, srv.gotMetadata.Get(proxygrpc.MetadataEnvdAccessToken)) + require.Equal(t, []string{"8080"}, srv.gotMetadata.Get(proxygrpc.MetadataSandboxRequestPort)) +} + +func TestGRPCPausedSandboxResumer_ResumeReturnsServerError(t *testing.T) { + t.Parallel() + + srv := &fakeSandboxServer{ + err: status.Error(codes.NotFound, "missing"), + } + conn := startFakeServer(t, srv) + + r := &grpcPausedSandboxResumer{ + conn: conn, + client: proxygrpc.NewSandboxServiceClient(conn), + auth: noopGrpcResumeAuth{}, + } + + ip, err := r.Resume(t.Context(), "sbx", 80, "", "") + require.Error(t, err) + require.Empty(t, ip) + st, ok := status.FromError(errors.Unwrap(err)) + require.True(t, ok) + require.Equal(t, codes.NotFound, st.Code()) + require.EqualValues(t, 1, srv.calls.Load()) +} + +type errAuth struct { + err error +} + +func (e errAuth) authorize(_ context.Context) (context.Context, error) { + return nil, e.err +} + +func TestGRPCPausedSandboxResumer_ResumeAuthError(t *testing.T) { + t.Parallel() + + srv := &fakeSandboxServer{ + resp: &proxygrpc.SandboxResumeResponse{OrchestratorIp: "10.0.0.7"}, + } + conn := startFakeServer(t, srv) + + authErr := errors.New("auth failed") + r := &grpcPausedSandboxResumer{ + conn: conn, + client: proxygrpc.NewSandboxServiceClient(conn), + auth: errAuth{err: authErr}, + } + + ip, err := r.Resume(t.Context(), "sbx", 80, "", "") + require.ErrorIs(t, err, authErr) + require.Empty(t, ip) + require.EqualValues(t, 0, srv.calls.Load()) +} + +func TestGRPCPausedSandboxResumer_Init(t *testing.T) { + t.Parallel() + + srv := &fakeSandboxServer{} + conn := startFakeServer(t, srv) + + r := &grpcPausedSandboxResumer{ + conn: conn, + client: proxygrpc.NewSandboxServiceClient(conn), + auth: noopGrpcResumeAuth{}, + } + + // Should not panic + r.Init(t.Context()) +} + +func TestGRPCPausedSandboxResumer_Close(t *testing.T) { + t.Parallel() + + srv := &fakeSandboxServer{} + conn := startFakeServer(t, srv) + + r := &grpcPausedSandboxResumer{ + conn: conn, + client: proxygrpc.NewSandboxServiceClient(conn), + auth: noopGrpcResumeAuth{}, + } + + require.NoError(t, r.Close(t.Context())) +} diff --git a/packages/client-proxy/internal/proxy/proxy.go b/packages/client-proxy/internal/proxy/proxy.go index 36e3956b08..d51024b1eb 100644 --- a/packages/client-proxy/internal/proxy/proxy.go +++ b/packages/client-proxy/internal/proxy/proxy.go @@ -224,7 +224,7 @@ func NewClientProxy(meterProvider metric.MeterProvider, serviceName string, port meter := meterProvider.Meter(serviceName) _, err := telemetry.GetObservableUpDownCounter( meter, telemetry.ClientProxyPoolConnectionsMeterCounterName, func(_ context.Context, observer metric.Int64Observer) error { - observer.Observe(proxy.CurrentServerConnections()) + observer.Observe(proxy.CurrentPoolConnections()) return nil }, @@ -246,7 +246,7 @@ func NewClientProxy(meterProvider metric.MeterProvider, serviceName string, port _, err = telemetry.GetObservableUpDownCounter( meter, telemetry.ClientProxyServerConnectionsMeterCounterName, func(_ context.Context, observer metric.Int64Observer) error { - observer.Observe(proxy.CurrentPoolConnections()) + observer.Observe(proxy.CurrentServerConnections()) return nil }, diff --git a/packages/client-proxy/internal/proxy/proxy_test.go b/packages/client-proxy/internal/proxy/proxy_test.go index 5b4f47fcdd..0e623ec231 100644 --- a/packages/client-proxy/internal/proxy/proxy_test.go +++ b/packages/client-proxy/internal/proxy/proxy_test.go @@ -2,11 +2,15 @@ package proxy import ( "context" + "errors" + "net/http" + "net/http/httptest" "testing" "time" "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldtestdata" "github.com/stretchr/testify/require" + noopmetric "go.opentelemetry.io/otel/metric/noop" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -179,162 +183,359 @@ func TestCatalogResolution_CatalogMiss(t *testing.T) { require.ErrorIs(t, err, ErrNodeNotFound) } -func TestCatalogResolution_CatalogMiss_ResumeEmptyIPReturnsRouteUnavailable(t *testing.T) { - t.Parallel() - - c := catalog.NewMemorySandboxesCatalog() - ff := newFF(t, true) - - nodeIP, err := catalogResolution(t.Context(), "sbx", 8000, "", "", c, stubResumer{nodeIP: ""}, ff) - require.ErrorIs(t, err, ErrNodeRouteUnavailable) - require.Empty(t, nodeIP) +type errorCatalog struct { + err error } -func TestHandlePausedSandbox_NoResumer_MissingTrafficAccessToken(t *testing.T) { - t.Parallel() - - ff := newFF(t, true) - - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "", "", nil, ff) - require.NoError(t, err) - require.Equal(t, autoResumeNotAllowed, res) +func (e errorCatalog) GetSandbox(_ context.Context, _ string) (*catalog.SandboxInfo, error) { + return nil, e.err } -func TestHandlePausedSandbox_NoResumer_InvalidTrafficAccessToken(t *testing.T) { - t.Parallel() - - ff := newFF(t, true) - - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "wrong-token", "", nil, ff) - require.NoError(t, err) - require.Equal(t, autoResumeNotAllowed, res) +func (e errorCatalog) StoreSandbox(_ context.Context, _ string, _ *catalog.SandboxInfo, _ time.Duration) error { + return nil } -func TestHandlePausedSandbox_FlagDisabled(t *testing.T) { - t.Parallel() - - ff := newFF(t, false) - - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{nodeIP: "10.0.0.1"}, ff) - require.NoError(t, err) - require.Equal(t, autoResumeNotAllowed, res) +func (e errorCatalog) DeleteSandbox(_ context.Context, _ string, _ string) error { + return nil } -func TestHandlePausedSandbox_NotFound(t *testing.T) { - t.Parallel() - - ff := newFF(t, true) - - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{err: status.Error(codes.NotFound, "not allowed")}, ff) - require.NoError(t, err) - require.Equal(t, autoResumeNotAllowed, res) +func (e errorCatalog) Close(_ context.Context) error { + return nil } -func TestHandlePausedSandbox_PermissionDenied(t *testing.T) { +func TestCatalogResolution_CatalogReturnsGenericError(t *testing.T) { t.Parallel() ff := newFF(t, true) + c := errorCatalog{err: errors.New("catalog unavailable")} - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{err: status.Error(codes.PermissionDenied, "permission denied")}, ff) + nodeIP, err := catalogResolution(t.Context(), "sbx", 8000, "", "", c, nil, ff) require.Error(t, err) - var deniedErr *reverseproxy.SandboxResumePermissionDeniedError - require.ErrorAs(t, err, &deniedErr) - require.Equal(t, autoResumePermissionDenied, res) + require.NotErrorIs(t, err, ErrNodeNotFound) + require.NotErrorIs(t, err, ErrNodeRouteUnavailable) + require.Empty(t, nodeIP) + require.Contains(t, err.Error(), "catalog unavailable") } -func TestHandlePausedSandbox_ResourceExhausted(t *testing.T) { +func TestCatalogResolution_CatalogMiss_ResumeEmptyIPReturnsRouteUnavailable(t *testing.T) { t.Parallel() + c := catalog.NewMemorySandboxesCatalog() ff := newFF(t, true) - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{err: status.Error(codes.ResourceExhausted, "rate limit hit")}, ff) - require.Error(t, err) - var exhaustedErr *reverseproxy.SandboxResourceExhaustedError - require.ErrorAs(t, err, &exhaustedErr) - require.Equal(t, "sbx", exhaustedErr.SandboxId) - require.Equal(t, "rate limit hit", exhaustedErr.Message) - require.Equal(t, autoResumeResourceExhausted, res) + nodeIP, err := catalogResolution(t.Context(), "sbx", 8000, "", "", c, stubResumer{nodeIP: ""}, ff) + require.ErrorIs(t, err, ErrNodeRouteUnavailable) + require.Empty(t, nodeIP) } -func TestHandlePausedSandbox_FailedPrecondition(t *testing.T) { +func TestHandlePausedSandbox(t *testing.T) { t.Parallel() - ff := newFF(t, true) + unavailableErr := status.Error(codes.Unavailable, "boom") + failedPreconditionErr := status.Error(codes.FailedPrecondition, "sandbox resume precondition failed") - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{err: status.Error(codes.FailedPrecondition, proxygrpc.SandboxStillTransitioningMessage)}, ff) - require.Error(t, err) - var transitioningErr *reverseproxy.SandboxStillTransitioningError - require.ErrorAs(t, err, &transitioningErr) - require.Equal(t, "sbx", transitioningErr.SandboxId) - require.Equal(t, autoResumeErrored, res) + tests := []struct { + name string + autoResume bool + resumer PausedSandboxResumer + trafficTok string + envdTok string + wantNodeIP string + wantResult autoResumeResult + wantErrIs error + checkErr func(*testing.T, error) + }{ + { + name: "no resumer", + autoResume: true, + resumer: nil, + wantResult: autoResumeNotAllowed, + }, + { + name: "no resumer ignores tokens", + autoResume: true, + resumer: nil, + trafficTok: "wrong-token", + envdTok: "envd-token", + wantResult: autoResumeNotAllowed, + }, + { + name: "flag disabled", + autoResume: false, + resumer: stubResumer{nodeIP: "10.0.0.1"}, + trafficTok: "token", + wantResult: autoResumeNotAllowed, + }, + { + name: "resume returns not found", + autoResume: true, + resumer: stubResumer{err: status.Error(codes.NotFound, "not allowed")}, + trafficTok: "token", + wantResult: autoResumeNotAllowed, + }, + { + name: "resume returns snapshot not found", + autoResume: true, + resumer: stubResumer{err: status.Error(codes.NotFound, "snapshot not found")}, + trafficTok: "token", + wantResult: autoResumeNotAllowed, + }, + { + name: "resume permission denied", + autoResume: true, + resumer: stubResumer{err: status.Error(codes.PermissionDenied, "permission denied")}, + trafficTok: "token", + wantResult: autoResumePermissionDenied, + checkErr: func(t *testing.T, err error) { + t.Helper() + + var deniedErr *reverseproxy.SandboxResumePermissionDeniedError + require.ErrorAs(t, err, &deniedErr) + require.Equal(t, "sbx", deniedErr.SandboxId) + }, + }, + { + name: "resume resource exhausted", + autoResume: true, + resumer: stubResumer{err: status.Error(codes.ResourceExhausted, "rate limit hit")}, + trafficTok: "token", + wantResult: autoResumeResourceExhausted, + checkErr: func(t *testing.T, err error) { + t.Helper() + + var exhaustedErr *reverseproxy.SandboxResourceExhaustedError + require.ErrorAs(t, err, &exhaustedErr) + require.Equal(t, "sbx", exhaustedErr.SandboxId) + require.Equal(t, "rate limit hit", exhaustedErr.Message) + }, + }, + { + name: "resume still transitioning", + autoResume: true, + resumer: stubResumer{err: status.Error(codes.FailedPrecondition, proxygrpc.SandboxStillTransitioningMessage)}, + trafficTok: "token", + wantResult: autoResumeErrored, + checkErr: func(t *testing.T, err error) { + t.Helper() + + var transitioningErr *reverseproxy.SandboxStillTransitioningError + require.ErrorAs(t, err, &transitioningErr) + require.Equal(t, "sbx", transitioningErr.SandboxId) + }, + }, + { + name: "failed precondition with other message stays generic", + autoResume: true, + resumer: stubResumer{err: failedPreconditionErr}, + trafficTok: "token", + wantResult: autoResumeErrored, + wantErrIs: failedPreconditionErr, + checkErr: func(t *testing.T, err error) { + t.Helper() + + var transitioningErr *reverseproxy.SandboxStillTransitioningError + require.NotErrorAs(t, err, &transitioningErr) + }, + }, + { + name: "resume returns generic grpc error", + autoResume: true, + resumer: stubResumer{err: unavailableErr}, + trafficTok: "token", + wantResult: autoResumeErrored, + wantErrIs: unavailableErr, + }, + { + name: "resume succeeds", + autoResume: true, + resumer: stubResumer{nodeIP: "10.0.0.1"}, + trafficTok: "token", + wantNodeIP: "10.0.0.1", + wantResult: autoResumeSucceeded, + }, + { + name: "resume succeeds with empty ip", + autoResume: true, + resumer: stubResumer{nodeIP: ""}, + trafficTok: "token", + wantResult: autoResumeErrored, + wantErrIs: ErrNodeRouteUnavailable, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ff := newFF(t, tt.autoResume) + nodeIP, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, tt.trafficTok, tt.envdTok, tt.resumer, ff) + + require.Equal(t, tt.wantResult, res) + require.Equal(t, tt.wantNodeIP, nodeIP) + if tt.wantErrIs != nil { + require.ErrorIs(t, err, tt.wantErrIs) + } else if tt.checkErr == nil { + require.NoError(t, err) + } + if tt.checkErr != nil { + require.Error(t, err) + tt.checkErr(t, err) + } + }) + } } -func TestHandlePausedSandbox_FailedPrecondition_OtherMessage(t *testing.T) { +func TestHandlePausedSandbox_PassesPortAndTokenToResumer(t *testing.T) { t.Parallel() ff := newFF(t, true) + resumer := &recordingResumer{} - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{err: status.Error(codes.FailedPrecondition, "sandbox resume precondition failed")}, ff) - require.Error(t, err) - var transitioningErr *reverseproxy.SandboxStillTransitioningError - require.NotErrorAs(t, err, &transitioningErr) - require.Equal(t, autoResumeErrored, res) + nodeIP, res, err := handlePausedSandbox(t.Context(), "sbx", 49983, "token", "envd-token", resumer, ff) + require.NoError(t, err) + require.Equal(t, autoResumeSucceeded, res) + require.Equal(t, "10.0.0.1", nodeIP) + require.Equal(t, "sbx", resumer.sandboxID) + require.EqualValues(t, 49983, resumer.sandboxPort) + require.Equal(t, "token", resumer.trafficAccessToken) + require.Equal(t, "envd-token", resumer.envdAccessToken) } -func TestHandlePausedSandbox_SnapshotNotFound(t *testing.T) { +func TestNewClientProxy_Construction(t *testing.T) { t.Parallel() + c := catalog.NewMemorySandboxesCatalog() ff := newFF(t, true) - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{err: status.Error(codes.NotFound, "snapshot not found")}, ff) + p, err := NewClientProxy(noopmetric.NewMeterProvider(), "test-service", 0, c, nil, ff) require.NoError(t, err) - require.Equal(t, autoResumeNotAllowed, res) + require.NotNil(t, p) + require.EqualValues(t, 0, p.CurrentServerConnections()) + require.EqualValues(t, 0, p.CurrentPoolConnections()) } -func TestHandlePausedSandbox_Error(t *testing.T) { +func TestNewClientProxy_HandlerErrors(t *testing.T) { t.Parallel() - ff := newFF(t, true) + tests := []struct { + name string + url string + resumer PausedSandboxResumer + wantStatus int + wantBodyContains []string + }{ + { + name: "sandbox not found", + url: "http://49983-sbx.e2b.app/", + wantStatus: http.StatusBadGateway, + wantBodyContains: []string{ + `"sandboxId":"sbx"`, + `"message":"The sandbox was not found"`, + }, + }, + { + name: "resume permission denied", + url: "http://49983-sbx.e2b.app/", + resumer: stubResumer{err: status.Error(codes.PermissionDenied, "denied")}, + wantStatus: http.StatusForbidden, + wantBodyContains: []string{ + `"sandboxId":"sbx"`, + `credentials provided`, + }, + }, + { + name: "resume resource exhausted", + url: "http://49983-sbx.e2b.app/", + resumer: stubResumer{err: status.Error(codes.ResourceExhausted, "rate limit")}, + wantStatus: http.StatusTooManyRequests, + wantBodyContains: []string{ + `"sandboxId":"sbx"`, + `"message":"rate limit"`, + }, + }, + { + name: "resume still transitioning", + url: "http://49983-sbx.e2b.app/", + resumer: stubResumer{err: status.Error(codes.FailedPrecondition, proxygrpc.SandboxStillTransitioningMessage)}, + wantStatus: http.StatusConflict, + wantBodyContains: []string{ + `"sandboxId":"sbx"`, + `still transitioning`, + }, + }, + { + name: "invalid host", + url: "http://invalid-host/", + wantStatus: http.StatusBadRequest, + wantBodyContains: []string{ + "Invalid host", + }, + }, + } - _, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{err: status.Error(codes.Unavailable, "boom")}, ff) - require.Error(t, err) - require.Equal(t, autoResumeErrored, res) + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + c := catalog.NewMemorySandboxesCatalog() + ff := newFF(t, true) + p, err := NewClientProxy(noopmetric.NewMeterProvider(), "handler-errors-"+tt.name, uint16(i), c, tt.resumer, ff) + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodGet, tt.url, nil).WithContext(t.Context()) + rr := httptest.NewRecorder() + p.Handler.ServeHTTP(rr, req) + + require.Equal(t, tt.wantStatus, rr.Code) + for _, want := range tt.wantBodyContains { + require.Contains(t, rr.Body.String(), want) + } + }) + } } -func TestHandlePausedSandbox_Succeeded(t *testing.T) { +func TestNewClientProxy_DuplicateMetricsRegistrationReturnsErrors(t *testing.T) { t.Parallel() + c := catalog.NewMemorySandboxesCatalog() ff := newFF(t, true) - nodeIP, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{nodeIP: "10.0.0.1"}, ff) - require.NoError(t, err) - require.Equal(t, autoResumeSucceeded, res) - require.Equal(t, "10.0.0.1", nodeIP) + // noop meter provider should not error; this is a sanity test that NewClientProxy + // works repeatedly for separate service names without leaking metric registrations. + for range 3 { + _, err := NewClientProxy(noopmetric.NewMeterProvider(), "service", 0, c, nil, ff) + require.NoError(t, err) + } } -func TestHandlePausedSandbox_Succeeded_EmptyIP(t *testing.T) { +// Sanity assertion that the proxy honors the configured idle timeout. +func TestNewClientProxy_HasIdleTimeout(t *testing.T) { t.Parallel() + c := catalog.NewMemorySandboxesCatalog() ff := newFF(t, true) - nodeIP, res, err := handlePausedSandbox(t.Context(), "sbx", 8000, "token", "", stubResumer{nodeIP: ""}, ff) - require.ErrorIs(t, err, ErrNodeRouteUnavailable) - require.Equal(t, autoResumeErrored, res) - require.Empty(t, nodeIP) + p, err := NewClientProxy(noopmetric.NewMeterProvider(), "service-idle", 0, c, nil, ff) + require.NoError(t, err) + require.GreaterOrEqual(t, p.IdleTimeout, idleTimeout) + require.Less(t, p.IdleTimeout, 2*idleTimeout) } -func TestHandlePausedSandbox_PassesPortAndTokenToResumer(t *testing.T) { +// Validate the Construction test exercises pool size accessor too. +func TestNewClientProxy_PoolAccessors(t *testing.T) { t.Parallel() + c := catalog.NewMemorySandboxesCatalog() ff := newFF(t, true) - resumer := &recordingResumer{} - nodeIP, res, err := handlePausedSandbox(t.Context(), "sbx", 49983, "token", "envd-token", resumer, ff) + p, err := NewClientProxy(noopmetric.NewMeterProvider(), "service-pool", 0, c, nil, ff) require.NoError(t, err) - require.Equal(t, autoResumeSucceeded, res) - require.Equal(t, "10.0.0.1", nodeIP) - require.Equal(t, "sbx", resumer.sandboxID) - require.EqualValues(t, 49983, resumer.sandboxPort) - require.Equal(t, "token", resumer.trafficAccessToken) - require.Equal(t, "envd-token", resumer.envdAccessToken) + require.GreaterOrEqual(t, p.CurrentPoolSize(), 0) + + // Even on no-op meter providers, the proxy must still be wired up correctly. + req := httptest.NewRequest(http.MethodGet, "http://49983-sbx.e2b.app/", nil).WithContext(t.Context()) + rr := httptest.NewRecorder() + p.Handler.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadGateway, rr.Code) + require.Contains(t, rr.Body.String(), `"sandboxId":"sbx"`) }