Skip to content

Commit 450d123

Browse files
committed
Account for closed connection when publishing messages
1 parent 539b00a commit 450d123

12 files changed

Lines changed: 82 additions & 19 deletions

File tree

client.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package appsync
33
import (
44
"context"
55

6+
"github.com/exanubes/appsync/internal/app/lifecycle"
67
"github.com/exanubes/appsync/internal/app/usecases/publish"
78
"github.com/exanubes/appsync/internal/app/usecases/subscribe"
89
"github.com/exanubes/appsync/internal/composition"
@@ -13,10 +14,15 @@ import (
1314
const ProtocolEvents = "aws-appsync-event-ws"
1415

1516
type appsync_client struct {
16-
usecases *composition.UseCases
17+
usecases *composition.UseCases
18+
connection *lifecycle.State
1719
}
1820

1921
func (client *appsync_client) Publish(ctx context.Context, input PublishCommandInput) error {
22+
if err := client.connection.Err(); err != nil {
23+
return err
24+
}
25+
2026
frame := &events.FrameBuilder{}
2127

2228
err := client.usecases.Publish.Publish(ctx, publish.PublishCommandInput{
@@ -28,6 +34,10 @@ func (client *appsync_client) Publish(ctx context.Context, input PublishCommandI
2834
}
2935

3036
func (client *appsync_client) Subscribe(ctx context.Context, input SubscribeCommandInput) (Subscription, error) {
37+
if err := client.connection.Err(); err != nil {
38+
return nil, err
39+
}
40+
3141
frame := &events.FrameBuilder{}
3242
result, err := client.usecases.Subscribe.Execute(ctx, subscribe.SubscribeCommandInput{
3343
Channel: input.Channel,

connect.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ func (builder *builder) Connect(ctx context.Context) (*appsync_client, error) {
138138
clock := clock.New()
139139
heartbeat := heartbeat.New(clock)
140140
ingress_queue := queue.NewIngressQueue(builder.backpressure.ConnectionInbound)
141-
egress_queue := queue.NewEgressQueue(builder.backpressure.ConnectionOutbound)
142-
pending_registry := pending.NewRegistry()
141+
egress_queue := queue.NewEgressQueue(builder.backpressure.ConnectionOutbound, connection_state)
142+
pending_registry := pending.NewRegistry(connection_state)
143143
io_loops := io.New(ingress_queue, egress_queue, connection_output.Connection, msg_codec)
144144
usecases, services := composition.NewUseCases(
145145
request_authorizer,
@@ -172,7 +172,8 @@ func (builder *builder) Connect(ctx context.Context) (*appsync_client, error) {
172172
usecases.Shutdown = shutdown_connection_usecase
173173

174174
return &appsync_client{
175-
usecases: usecases,
175+
usecases: usecases,
176+
connection: connection_state,
176177
}, nil
177178
}
178179

internal/app/pending/pending.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
package pending
22

3-
import "context"
3+
import (
4+
"context"
5+
6+
"github.com/exanubes/appsync/internal/app"
7+
)
48

59
type Registry struct {
6-
store map[string]chan error
10+
store map[string]chan error
11+
connection ConnectionState
712
}
813

9-
func NewRegistry() *Registry {
14+
func NewRegistry(connection ConnectionState) *Registry {
1015
return &Registry{
11-
store: make(map[string]chan error),
16+
store: make(map[string]chan error),
17+
connection: connection,
1218
}
1319
}
1420

@@ -33,9 +39,15 @@ func (registry Registry) Fulfill(ctx context.Context, id string, err error) erro
3339
return ctx.Err()
3440
case reply <- err:
3541
return nil
42+
case <-registry.connection.Done():
43+
return app.ErrConnectionClosed
3644
}
3745
}
3846

47+
func (registry Registry) Remove(id string) {
48+
delete(registry.store, id)
49+
}
50+
3951
func (registry Registry) get(id string) chan error {
4052
return registry.store[id]
4153
}
@@ -54,5 +66,7 @@ func (registry Registry) Consume(ctx context.Context, id string) error {
5466
return ctx.Err()
5567
case res := <-reply:
5668
return res
69+
case <-registry.connection.Done():
70+
return app.ErrConnectionClosed
5771
}
5872
}

internal/app/pending/pending_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"github.com/exanubes/appsync/internal/app/pending"
99
)
1010

11+
type mock_connection_state struct{}
12+
13+
func (m *mock_connection_state) Done() <-chan struct{} { return make(chan struct{}) }
14+
1115
func TestHas(t *testing.T) {
1216
tests := []struct {
1317
name string
@@ -33,7 +37,7 @@ func TestHas(t *testing.T) {
3337

3438
for _, tt := range tests {
3539
t.Run(tt.name, func(t *testing.T) {
36-
registry := pending.NewRegistry()
40+
registry := pending.NewRegistry(&mock_connection_state{})
3741
tt.setup(registry)
3842
has := registry.Has(tt.id)
3943
if has != tt.expect_has {
@@ -56,7 +60,7 @@ func TestRegister(t *testing.T) {
5660

5761
for _, tt := range tests {
5862
t.Run(tt.name, func(t *testing.T) {
59-
registry := pending.NewRegistry()
63+
registry := pending.NewRegistry(&mock_connection_state{})
6064
registry.Register(tt.id)
6165
if !registry.Has(tt.id) {
6266
t.Errorf("Has(%q) = false after Register, want true", tt.id)
@@ -123,7 +127,7 @@ func TestFulfill(t *testing.T) {
123127

124128
for _, tt := range tests {
125129
t.Run(tt.name, func(t *testing.T) {
126-
registry := pending.NewRegistry()
130+
registry := pending.NewRegistry(&mock_connection_state{})
127131
tt.setup(registry)
128132
err := registry.Fulfill(tt.ctx(), tt.id, tt.err)
129133
if !errors.Is(err, tt.expect_err) {
@@ -203,7 +207,7 @@ func TestConsume(t *testing.T) {
203207

204208
for _, tt := range tests {
205209
t.Run(tt.name, func(t *testing.T) {
206-
registry := pending.NewRegistry()
210+
registry := pending.NewRegistry(&mock_connection_state{})
207211
tt.setup(registry)
208212
err := registry.Consume(tt.ctx(), tt.id)
209213
if !errors.Is(err, tt.expect_err) {

internal/app/pending/ports.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package pending
2+
3+
type ConnectionState interface {
4+
Done() <-chan struct{}
5+
}

internal/app/queue/egress-queue.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@ package queue
22

33
import (
44
"context"
5+
6+
"github.com/exanubes/appsync/internal/app"
57
)
68

79
type EgressQueue struct {
8-
inbox chan []byte
10+
inbox chan []byte
11+
connection ConnectionState
912
}
1013

11-
func NewEgressQueue(max_size uint) *EgressQueue {
14+
func NewEgressQueue(max_size uint, connection ConnectionState) *EgressQueue {
1215
return &EgressQueue{
13-
inbox: make(chan []byte, max_size),
16+
inbox: make(chan []byte, max_size),
17+
connection: connection,
1418
}
1519
}
1620

@@ -29,5 +33,7 @@ func (registry *EgressQueue) Enqueue(ctx context.Context, payload []byte) error
2933
return nil
3034
case <-ctx.Done():
3135
return ctx.Err()
36+
case <-registry.connection.Done():
37+
return app.ErrConnectionClosed
3238
}
3339
}

internal/app/queue/egress_queue_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"github.com/exanubes/appsync/internal/app/queue"
99
)
1010

11+
type mock_connection_state struct{}
12+
13+
func (m *mock_connection_state) Done() <-chan struct{} { return make(chan struct{}) }
14+
1115
func TestEgressQueue_Next(t *testing.T) {
1216
payload := []byte("test-payload")
1317

@@ -42,7 +46,7 @@ func TestEgressQueue_Next(t *testing.T) {
4246

4347
for _, tt := range tests {
4448
t.Run(tt.name, func(t *testing.T) {
45-
q := queue.NewEgressQueue(1)
49+
q := queue.NewEgressQueue(1, &mock_connection_state{})
4650
tt.setup(q)
4751

4852
got, err := q.Next(tt.ctx())
@@ -91,7 +95,7 @@ func TestEgressQueue_Enqueue(t *testing.T) {
9195

9296
for _, tt := range tests {
9397
t.Run(tt.name, func(t *testing.T) {
94-
q := queue.NewEgressQueue(1)
98+
q := queue.NewEgressQueue(1, &mock_connection_state{})
9599
tt.setup(q)
96100

97101
err := q.Enqueue(tt.ctx(), tt.payload)

internal/app/queue/ports.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package queue
2+
3+
type ConnectionState interface {
4+
Done() <-chan struct{}
5+
}

internal/app/router/router_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
"github.com/exanubes/appsync/internal/app/router"
1111
)
1212

13+
type mock_connection_state struct{}
14+
15+
func (m *mock_connection_state) Done() <-chan struct{} { return make(chan struct{}) }
16+
1317
type mock_receive_data struct {
1418
called bool
1519
received protocol.DataMessage
@@ -110,7 +114,7 @@ func TestHandle(t *testing.T) {
110114

111115
for _, tt := range tests {
112116
t.Run(tt.name, func(t *testing.T) {
113-
registry := pending.NewRegistry()
117+
registry := pending.NewRegistry(&mock_connection_state{})
114118
if tt.setup != nil {
115119
tt.setup(registry)
116120
}

internal/app/services/request/ports.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type Outbox interface {
1010

1111
type Registry interface {
1212
Register(id string)
13+
Remove(id string)
1314
Consume(ctx context.Context, id string) error
1415
Has(id string) bool
1516
}

0 commit comments

Comments
 (0)