Skip to content

Commit 2dd262f

Browse files
authored
Merge pull request #1 from michasdev/feat/sqs
feat: sqs base implementation
2 parents 49d7073 + d491c96 commit 2dd262f

34 files changed

Lines changed: 4488 additions & 4 deletions

core/cmd/mildstack/main.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ func main() {
4545
if err := registerNativeS3Routes(router, root.Services); err != nil {
4646
return recordingHTTPServer{server: failedHTTPServer{err: err}, storage: storage, port: port, instanceID: instanceID}
4747
}
48+
if err := registerNativeSQSRoutes(router, root.Services); err != nil {
49+
return recordingHTTPServer{server: failedHTTPServer{err: err}, storage: storage, port: port, instanceID: instanceID}
50+
}
4851
registrar := instanceRegistrar{manager: manager, storage: storage, instanceID: instanceID}
4952
return recordingHTTPServer{server: deliveryhttp.NewServer(registrar, router, port), storage: storage, port: port, instanceID: instanceID}
5053
}
@@ -178,6 +181,27 @@ func registerNativeDynamoDBRoutes(router *deliveryhttp.Router, services []orches
178181
return nil
179182
}
180183

184+
func registerNativeSQSRoutes(router *deliveryhttp.Router, services []orchestrator.Service) error {
185+
if router == nil {
186+
return nil
187+
}
188+
189+
for _, service := range services {
190+
if service == nil || service.Metadata().Name != "sqs" {
191+
continue
192+
}
193+
194+
sqsService, ok := service.(deliveryhttp.SQSNativeService)
195+
if !ok {
196+
return fmt.Errorf("sqs service does not expose the native http surface")
197+
}
198+
deliveryhttp.RegisterSQSNativeRoutes(router.Engine(), sqsService)
199+
return nil
200+
}
201+
202+
return nil
203+
}
204+
181205
func containsPort(ports []int, port int) bool {
182206
for _, existing := range ports {
183207
if existing == port {

core/cmd/mildstack/main_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package main
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"io"
79
"net/http"
810
"net/http/httptest"
911
"os"
@@ -17,6 +19,7 @@ import (
1719
"github.com/aws/aws-sdk-go-v2/credentials"
1820
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
1921
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
22+
sqssdk "github.com/aws/aws-sdk-go-v2/service/sqs"
2023
"github.com/michasdev/mildstack/core/internal/application/orchestrator"
2124
"github.com/michasdev/mildstack/core/internal/application/runtime"
2225
"github.com/michasdev/mildstack/core/internal/composition"
@@ -688,6 +691,93 @@ func TestInstanceRegistrarServeSkipsDuplicateLoadedPort(t *testing.T) {
688691
}
689692
}
690693

694+
func TestRegisterNativeSQSRoutesExposesAwsCompatibleSmokeSurface(t *testing.T) {
695+
t.Helper()
696+
697+
root := composition.DefaultRoot("test-instance")
698+
manager := runtime.New(root.Services)
699+
router := deliveryhttp.NewRouter(deliveryhttp.DefaultConfig(), manager)
700+
701+
if err := registerNativeSQSRoutes(router, root.Services); err != nil {
702+
t.Fatalf("register native sqs routes: %v", err)
703+
}
704+
705+
healthRecorder := httptest.NewRecorder()
706+
healthRequest := httptest.NewRequest(http.MethodGet, "/api/v1/runtime/health", nil)
707+
router.Engine().ServeHTTP(healthRecorder, healthRequest)
708+
if got, want := healthRecorder.Code, http.StatusOK; got != want {
709+
t.Fatalf("unexpected health status: got %d want %d", got, want)
710+
}
711+
712+
rootRecorder := httptest.NewRecorder()
713+
rootRequest := httptest.NewRequest(http.MethodGet, "/?Action=ListQueues&Version=2012-11-05", nil)
714+
router.Engine().ServeHTTP(rootRecorder, rootRequest)
715+
if got, want := rootRecorder.Code, http.StatusBadRequest; got != want {
716+
t.Fatalf("unexpected sqs root status: got %d want %d", got, want)
717+
}
718+
if !strings.Contains(rootRecorder.Body.String(), "<ErrorResponse>") {
719+
t.Fatalf("expected sqs error response xml, got %q", rootRecorder.Body.String())
720+
}
721+
if !strings.Contains(rootRecorder.Body.String(), "UnsupportedOperation") {
722+
t.Fatalf("expected unsupported operation xml, got %q", rootRecorder.Body.String())
723+
}
724+
725+
server := httptest.NewServer(router.Engine())
726+
t.Cleanup(server.Close)
727+
728+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
729+
t.Cleanup(cancel)
730+
731+
transport := &captureTransport{base: http.DefaultTransport}
732+
cfg, err := config.LoadDefaultConfig(ctx,
733+
config.WithRegion("us-east-1"),
734+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("test", "test", "test")),
735+
config.WithHTTPClient(&http.Client{Transport: transport}),
736+
)
737+
if err != nil {
738+
t.Fatalf("load aws config: %v", err)
739+
}
740+
741+
client := sqssdk.NewFromConfig(cfg, func(o *sqssdk.Options) {
742+
o.BaseEndpoint = aws.String(server.URL)
743+
})
744+
745+
_, err = client.ListQueues(ctx, &sqssdk.ListQueuesInput{})
746+
if err == nil {
747+
t.Fatal("expected list queues to return an error")
748+
}
749+
if !strings.Contains(string(transport.body), "<ErrorResponse>") {
750+
t.Fatalf("expected captured sqs xml body, got %q", string(transport.body))
751+
}
752+
if !strings.Contains(string(transport.body), "UnsupportedOperation") && !strings.Contains(string(transport.body), "InvalidQueryParameter") {
753+
t.Fatalf("expected captured sqs xml body to contain an SQS error code, got %q", string(transport.body))
754+
}
755+
}
756+
757+
type captureTransport struct {
758+
base http.RoundTripper
759+
body []byte
760+
}
761+
762+
func (t *captureTransport) RoundTrip(req *http.Request) (*http.Response, error) {
763+
resp, err := t.base.RoundTrip(req)
764+
if err != nil {
765+
return nil, err
766+
}
767+
if resp.Body == nil {
768+
return resp, nil
769+
}
770+
771+
data, readErr := io.ReadAll(resp.Body)
772+
_ = resp.Body.Close()
773+
if readErr != nil {
774+
return nil, readErr
775+
}
776+
t.body = append([]byte(nil), data...)
777+
resp.Body = io.NopCloser(bytes.NewReader(data))
778+
return resp, nil
779+
}
780+
691781
func newDynamoDBSmokeClient(t *testing.T, endpoint string) *dynamodb.Client {
692782
t.Helper()
693783

core/internal/composition/default_root.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import (
99
"github.com/michasdev/mildstack/core/internal/application/runtime"
1010
"github.com/michasdev/mildstack/core/internal/resources/dynamodb"
1111
"github.com/michasdev/mildstack/core/internal/resources/s3"
12+
"github.com/michasdev/mildstack/core/internal/resources/sqs"
1213
)
1314

1415
type DefaultRootConfig struct {
1516
InstanceID string
1617
S3StorageBaseDir string
1718
DynamoDBStorageBaseDir string
19+
SQSStorageBaseDir string
1820
}
1921

2022
func DefaultRoot(instanceID string) Root {
@@ -48,7 +50,17 @@ func defaultRootWithHook(hook orchestrator.StateHook, config DefaultRootConfig)
4850
panic(fmt.Sprintf("composition: init dynamodb service: %v", err))
4951
}
5052

51-
services := []orchestrator.Service{s3Service, dynamoService}
53+
sqsService, err := sqs.NewWithStorage(sqs.StorageConfig{
54+
BaseDir: config.SQSStorageBaseDir,
55+
InstanceID: instanceID,
56+
})
57+
if err != nil {
58+
_ = s3Service.Stop(context.Background())
59+
_ = dynamoService.Stop(context.Background())
60+
panic(fmt.Sprintf("composition: init sqs service: %v", err))
61+
}
62+
63+
services := []orchestrator.Service{s3Service, dynamoService, sqsService}
5264
for _, service := range services {
5365
if err := service.AttachState(hook); err != nil {
5466
for _, candidate := range services {

core/internal/composition/default_root_test.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
dynamodbapp "github.com/michasdev/mildstack/core/internal/resources/dynamodb/application"
1010
dynamodbdomain "github.com/michasdev/mildstack/core/internal/resources/dynamodb/domain"
1111
s3domain "github.com/michasdev/mildstack/core/internal/resources/s3/domain"
12+
sqsdomain "github.com/michasdev/mildstack/core/internal/resources/sqs/domain"
1213
)
1314

1415
type stateHookStub struct {
@@ -36,19 +37,24 @@ func TestDefaultRootIncludesS3AndDynamoDBWithDeterministicRoutes(t *testing.T) {
3637
InstanceID: "test-instance",
3738
S3StorageBaseDir: baseDir,
3839
DynamoDBStorageBaseDir: baseDir,
40+
SQSStorageBaseDir: baseDir,
3941
})
40-
if got, want := len(root.Services), 2; got != want {
42+
if got, want := len(root.Services), 3; got != want {
4143
t.Fatalf("unexpected service count: got %d want %d", got, want)
4244
}
4345

4446
first := root.Services[0]
4547
second := root.Services[1]
48+
third := root.Services[2]
4649
if got, want := first.Metadata().Name, "s3"; got != want {
4750
t.Fatalf("unexpected first service name: got %q want %q", got, want)
4851
}
4952
if got, want := second.Metadata().Name, "dynamodb"; got != want {
5053
t.Fatalf("unexpected second service name: got %q want %q", got, want)
5154
}
55+
if got, want := third.Metadata().Name, "sqs"; got != want {
56+
t.Fatalf("unexpected third service name: got %q want %q", got, want)
57+
}
5258

5359
registrar := deliveryhttp.NewRegistrar()
5460
for _, service := range root.Services {
@@ -58,7 +64,7 @@ func TestDefaultRootIncludesS3AndDynamoDBWithDeterministicRoutes(t *testing.T) {
5864
}
5965

6066
entries := registrar.Services()
61-
if got, want := len(entries), 2; got != want {
67+
if got, want := len(entries), 3; got != want {
6268
t.Fatalf("unexpected catalog size: got %d want %d", got, want)
6369
}
6470
if got, want := entries[0].Name, "dynamodb"; got != want {
@@ -67,6 +73,9 @@ func TestDefaultRootIncludesS3AndDynamoDBWithDeterministicRoutes(t *testing.T) {
6773
if got, want := entries[1].Name, "s3"; got != want {
6874
t.Fatalf("unexpected second catalog service: got %q want %q", got, want)
6975
}
76+
if got, want := entries[2].Name, "sqs"; got != want {
77+
t.Fatalf("unexpected third catalog service: got %q want %q", got, want)
78+
}
7079

7180
s3Entry, ok := registrar.Service("s3")
7281
if !ok {
@@ -125,6 +134,9 @@ func TestDefaultRootIncludesS3AndDynamoDBWithDeterministicRoutes(t *testing.T) {
125134
if _, ok := root.Services[1].(deliveryhttp.DynamoDBNativeService); !ok {
126135
t.Fatal("expected dynamodb service to expose the native http surface")
127136
}
137+
if _, ok := root.Services[2].(deliveryhttp.SQSNativeService); !ok {
138+
t.Fatal("expected sqs service to expose the native http surface")
139+
}
128140

129141
if value, ok := hook.Get(dynamodbdomain.StateKey); !ok {
130142
t.Fatalf("expected state for %q to be present", dynamodbdomain.StateKey)
@@ -144,6 +156,18 @@ func TestDefaultRootIncludesS3AndDynamoDBWithDeterministicRoutes(t *testing.T) {
144156
t.Fatalf("unexpected s3 state: got %v want %v", got, want)
145157
}
146158

159+
if value, ok := hook.Get(sqsdomain.StateKey); !ok {
160+
t.Fatalf("expected state for %q to be present", sqsdomain.StateKey)
161+
} else {
162+
state := value.(map[string]any)
163+
if got, want := state["service"], "sqs"; got != want {
164+
t.Fatalf("unexpected sqs state: got %v want %v", got, want)
165+
}
166+
if got, want := len(state["queues"].([]any)), 0; got != want {
167+
t.Fatalf("unexpected sqs queue count: got %d want %d", got, want)
168+
}
169+
}
170+
147171
dynamoDBPath := filepath.Join(baseDir, "instances", "test-instance", "dynamodb", "state.db")
148172
if _, err := os.Stat(dynamoDBPath); err != nil {
149173
t.Fatalf("expected dynamodb database to exist at %s: %v", dynamoDBPath, err)
@@ -186,8 +210,9 @@ func TestDefaultRootUsesInstanceScopedDynamoDBStorage(t *testing.T) {
186210
InstanceID: "instance-a",
187211
S3StorageBaseDir: baseDir,
188212
DynamoDBStorageBaseDir: baseDir,
213+
SQSStorageBaseDir: baseDir,
189214
})
190-
if got, want := len(root.Services), 2; got != want {
215+
if got, want := len(root.Services), 3; got != want {
191216
t.Fatalf("unexpected service count: got %d want %d", got, want)
192217
}
193218

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package http
2+
3+
import (
4+
"errors"
5+
"net/http"
6+
"strings"
7+
8+
"github.com/gin-gonic/gin"
9+
"github.com/michasdev/mildstack/core/internal/application/orchestrator"
10+
)
11+
12+
type SQSNativeService interface {
13+
Policy() orchestrator.EmulationPolicy
14+
Metadata() orchestrator.Metadata
15+
}
16+
17+
func RegisterSQSNativeRoutes(engine *gin.Engine, service SQSNativeService) {
18+
if engine == nil || service == nil {
19+
return
20+
}
21+
22+
handler := newSQSNativeHandler(service)
23+
engine.Use(func(c *gin.Context) {
24+
if handled := handler.dispatch(c); handled {
25+
c.Abort()
26+
return
27+
}
28+
c.Next()
29+
})
30+
}
31+
32+
type sqsNativeHandler struct {
33+
service SQSNativeService
34+
registry SQSRegistry
35+
supported map[string]struct{}
36+
}
37+
38+
func newSQSNativeHandler(service SQSNativeService) sqsNativeHandler {
39+
supported := make(map[string]struct{})
40+
if service != nil {
41+
for _, action := range service.Policy().Supported {
42+
supported[action] = struct{}{}
43+
}
44+
}
45+
46+
return sqsNativeHandler{
47+
service: service,
48+
registry: NewSQSRegistry(),
49+
supported: supported,
50+
}
51+
}
52+
53+
func (h sqsNativeHandler) dispatch(c *gin.Context) bool {
54+
if c == nil || c.Request == nil || c.Request.URL == nil {
55+
return false
56+
}
57+
58+
path := strings.TrimSpace(c.Request.URL.Path)
59+
if path == "" || strings.HasPrefix(path, "/api/") {
60+
return false
61+
}
62+
switch c.Request.Method {
63+
case http.MethodGet, http.MethodPost:
64+
default:
65+
return false
66+
}
67+
68+
ctx, err := ParseSQSRequest(c.Request)
69+
if err != nil {
70+
if errors.Is(err, ErrSQSNotOwned) {
71+
return false
72+
}
73+
writeSQSError(c, err, requestIDFromContext(c))
74+
return true
75+
}
76+
77+
spec, err := h.registry.Resolve(ctx)
78+
if err != nil {
79+
writeSQSError(c, err, requestIDFromContext(c))
80+
return true
81+
}
82+
83+
if _, ok := h.supported[spec.Action]; !ok || spec.DomainDeferred {
84+
writeSQSError(c, ErrSQSUnsupported, requestIDFromContext(c))
85+
return true
86+
}
87+
88+
writeSQSError(c, ErrSQSUnsupported, requestIDFromContext(c))
89+
return true
90+
}
91+
92+
func requestIDFromContext(c *gin.Context) string {
93+
if c == nil {
94+
return "mildstack-sqs-request"
95+
}
96+
97+
for _, key := range []string{"x-amzn-requestid", "X-Amzn-RequestId", "x-amz-request-id"} {
98+
if requestID := strings.TrimSpace(c.GetHeader(key)); requestID != "" {
99+
return requestID
100+
}
101+
}
102+
103+
return "mildstack-sqs-request"
104+
}

0 commit comments

Comments
 (0)