Skip to content

Commit 539b00a

Browse files
committed
Create lifecycle state
1 parent cd9a4d1 commit 539b00a

9 files changed

Lines changed: 258 additions & 12 deletions

File tree

connect.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/exanubes/appsync/internal/app"
1010
"github.com/exanubes/appsync/internal/app/engine"
1111
"github.com/exanubes/appsync/internal/app/heartbeat"
12+
"github.com/exanubes/appsync/internal/app/lifecycle"
1213
"github.com/exanubes/appsync/internal/app/pending"
1314
"github.com/exanubes/appsync/internal/app/queue"
1415
"github.com/exanubes/appsync/internal/app/router"
@@ -133,6 +134,7 @@ func (builder *builder) Connect(ctx context.Context) (*appsync_client, error) {
133134
return nil, err
134135
}
135136

137+
connection_state := lifecycle.NewState()
136138
clock := clock.New()
137139
heartbeat := heartbeat.New(clock)
138140
ingress_queue := queue.NewIngressQueue(builder.backpressure.ConnectionInbound)
@@ -149,7 +151,13 @@ func (builder *builder) Connect(ctx context.Context) (*appsync_client, error) {
149151

150152
msg_router := router.New(pending_registry, usecases.ReceiveData)
151153
runtime := runtime.New(ingress_queue, msg_router, heartbeat)
152-
session := engine.New(heartbeat, runtime, io_loops, builder.logger)
154+
session := engine.New(
155+
heartbeat,
156+
runtime,
157+
io_loops,
158+
connection_state,
159+
builder.logger,
160+
)
153161
session.Start(ctx, engine.StartEngineInput{
154162
Timeout: connection_output.Timeout,
155163
})

errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ var (
1010
ErrSubscriptionClosed = app.ErrSubscriptionClosed
1111
ErrSubscriptionNotFound = app.ErrSubscriptionNotFound
1212
ErrHeartbeatTimeout = app.ErrHeartbeatTimeout
13+
ErrConnectionClosed = app.ErrConnectionClosed
1314
)

internal/app/engine/engine.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@ type Engine struct {
1717
wg sync.WaitGroup
1818
ctx context.Context
1919
cancel context.CancelFunc
20+
state ConnectionState
2021
}
2122

2223
var managed_goroutines_count = 4
2324

24-
func New(heartbeat app.Heartbeat, runtime Runtime, io IO, logger app.Logger) *Engine {
25+
func New(heartbeat app.Heartbeat, runtime Runtime, io IO, state ConnectionState, logger app.Logger) *Engine {
2526
return &Engine{
2627
io: io,
2728
logger: logger.SetContext("Engine"),
2829
runtime: runtime,
2930
heartbeat: heartbeat,
31+
state: state,
3032
err_channel: make(chan error, managed_goroutines_count),
3133
}
3234
}
@@ -35,28 +37,36 @@ func (engine *Engine) Start(ctx context.Context, input StartEngineInput) {
3537
engine.ctx, engine.cancel = context.WithCancel(ctx)
3638
engine.wg.Add(managed_goroutines_count)
3739
go func() {
38-
engine.err_channel <- engine.io.Read(engine.ctx)
40+
err := engine.io.Read(engine.ctx)
41+
engine.state.Close(err)
42+
engine.err_channel <- err
3943
engine.logger.Debug("Exitted ingress loop")
4044
engine.wg.Done()
4145
engine.cancel()
4246
}()
4347

4448
go func() {
45-
engine.err_channel <- engine.io.Write(engine.ctx)
49+
err := engine.io.Write(engine.ctx)
50+
engine.state.Close(err)
51+
engine.err_channel <- err
4652
engine.logger.Debug("Exitted egress loop")
4753
engine.wg.Done()
4854
engine.cancel()
4955
}()
5056

5157
go func() {
52-
engine.err_channel <- engine.runtime.Run(engine.ctx)
58+
err := engine.runtime.Run(engine.ctx)
59+
engine.state.Close(err)
60+
engine.err_channel <- err
5361
engine.logger.Debug("Exitted runtime loop")
5462
engine.wg.Done()
5563
engine.cancel()
5664
}()
5765

5866
go func() {
59-
engine.err_channel <- engine.heartbeat.Start(engine.ctx, input.Timeout)
67+
err := engine.heartbeat.Start(engine.ctx, input.Timeout)
68+
engine.state.Close(err)
69+
engine.err_channel <- err
6070
engine.logger.Debug("Exitted heartbeat loop")
6171
engine.wg.Done()
6272
engine.cancel()

internal/app/engine/engine_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,13 @@ func (m *mock_heartbeat) Start(ctx context.Context, _ time.Duration) error {
6868

6969
func (m *mock_heartbeat) Reset() {}
7070

71+
type mock_connection_state struct{}
72+
73+
func (m *mock_connection_state) Close(cause error) {}
74+
7175
func TestNew(t *testing.T) {
7276
logger := &stub_logger{}
73-
engine.New(&mock_heartbeat{}, &mock_runtime{}, &mock_io{}, logger)
77+
engine.New(&mock_heartbeat{}, &mock_runtime{}, &mock_io{}, &mock_connection_state{}, logger)
7478
if logger.ctx != "Engine" {
7579
t.Errorf("expected logger context 'Engine', got %q", logger.ctx)
7680
}
@@ -81,7 +85,7 @@ func TestStart_calls_all_goroutines(t *testing.T) {
8185
runtime := &mock_runtime{}
8286
hb := &mock_heartbeat{}
8387

84-
e := engine.New(hb, runtime, io, &stub_logger{})
88+
e := engine.New(hb, runtime, io, &mock_connection_state{}, &stub_logger{})
8589
e.Start(context.Background(), engine.StartEngineInput{Timeout: time.Second})
8690
e.Close(context.Background())
8791

@@ -137,7 +141,7 @@ func TestClose(t *testing.T) {
137141
runtime := &mock_runtime{err: tt.err}
138142
hb := &mock_heartbeat{err: tt.err}
139143

140-
e := engine.New(hb, runtime, io, &stub_logger{})
144+
e := engine.New(hb, runtime, io, &mock_connection_state{}, &stub_logger{})
141145
e.Start(context.Background(), engine.StartEngineInput{Timeout: time.Second})
142146
err := e.Close(context.Background())
143147

internal/app/engine/ports.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,7 @@ type Runtime interface {
3131
type Inbox interface {
3232
Next(context.Context) (app.Message, error)
3333
}
34+
35+
type ConnectionState interface {
36+
Close(error)
37+
}

internal/app/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ var ErrSubscriptionInboxFull = errors.New("Subscription incoming message buffer
99
var ErrSubscriptionClosed = errors.New("Subscription is closed")
1010
var ErrSubscriptionNotFound = errors.New("Subscription not found")
1111
var ErrHeartbeatTimeout = errors.New("Heartbeat timeout")
12+
var ErrConnectionClosed = errors.New("Connection closed")
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package lifecycle
2+
3+
import (
4+
"errors"
5+
"sync"
6+
7+
"github.com/exanubes/appsync/internal/app"
8+
)
9+
10+
type State struct {
11+
once sync.Once
12+
done chan struct{}
13+
mutex sync.RWMutex
14+
err error
15+
}
16+
17+
func NewState() *State {
18+
return &State{
19+
done: make(chan struct{}),
20+
}
21+
}
22+
23+
func (state *State) Done() <-chan struct{} {
24+
return state.done
25+
}
26+
27+
func (state *State) Err() error {
28+
select {
29+
case <-state.done:
30+
state.mutex.RLock()
31+
defer state.mutex.RUnlock()
32+
33+
if state.err == nil {
34+
return app.ErrConnectionClosed
35+
}
36+
37+
return state.err
38+
default:
39+
return nil
40+
}
41+
}
42+
43+
func (state *State) Close(cause error) {
44+
state.once.Do(func() {
45+
if cause == nil {
46+
cause = app.ErrConnectionClosed
47+
}
48+
49+
if !errors.Is(cause, app.ErrConnectionClosed) {
50+
cause = errors.Join(app.ErrConnectionClosed, cause)
51+
}
52+
53+
state.mutex.Lock()
54+
state.err = cause
55+
state.mutex.Unlock()
56+
57+
close(state.done)
58+
})
59+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package lifecycle_test
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/exanubes/appsync/internal/app"
10+
"github.com/exanubes/appsync/internal/app/lifecycle"
11+
)
12+
13+
var errCustom = errors.New("custom error")
14+
15+
func closed_state(cause error) *lifecycle.State {
16+
s := lifecycle.NewState()
17+
s.Close(cause)
18+
return s
19+
}
20+
21+
func TestNewState(t *testing.T) {
22+
t.Run("done channel is open", func(t *testing.T) {
23+
s := lifecycle.NewState()
24+
select {
25+
case <-s.Done():
26+
t.Fatal("Done() should not be readable on a new state")
27+
default:
28+
}
29+
})
30+
}
31+
32+
func TestDone(t *testing.T) {
33+
t.Run("blocks before Close", func(t *testing.T) {
34+
s := lifecycle.NewState()
35+
select {
36+
case <-s.Done():
37+
t.Fatal("Done() should block before Close()")
38+
default:
39+
}
40+
})
41+
42+
t.Run("readable after Close", func(t *testing.T) {
43+
s := closed_state(nil)
44+
select {
45+
case <-s.Done():
46+
case <-time.After(100 * time.Millisecond):
47+
t.Fatal("Done() should be readable after Close()")
48+
}
49+
})
50+
}
51+
52+
func TestErr(t *testing.T) {
53+
tests := []struct {
54+
name string
55+
state func() *lifecycle.State
56+
is_closed bool
57+
is_custom bool
58+
}{
59+
{
60+
name: "returns nil before Close",
61+
state: lifecycle.NewState,
62+
is_closed: false,
63+
is_custom: false,
64+
},
65+
{
66+
name: "returns ErrConnectionClosed when closed with nil",
67+
state: func() *lifecycle.State { return closed_state(nil) },
68+
is_closed: true,
69+
is_custom: false,
70+
},
71+
{
72+
name: "returns ErrConnectionClosed when closed with ErrConnectionClosed",
73+
state: func() *lifecycle.State { return closed_state(app.ErrConnectionClosed) },
74+
is_closed: true,
75+
is_custom: false,
76+
},
77+
{
78+
name: "wraps custom error with ErrConnectionClosed",
79+
state: func() *lifecycle.State { return closed_state(errCustom) },
80+
is_closed: true,
81+
is_custom: true,
82+
},
83+
{
84+
name: "preserves pre-joined error containing ErrConnectionClosed and custom error",
85+
state: func() *lifecycle.State { return closed_state(errors.Join(app.ErrConnectionClosed, errCustom)) },
86+
is_closed: true,
87+
is_custom: true,
88+
},
89+
}
90+
91+
for _, tt := range tests {
92+
t.Run(tt.name, func(t *testing.T) {
93+
err := tt.state().Err()
94+
95+
if !tt.is_closed {
96+
if err != nil {
97+
t.Errorf("Err() = %v, want nil", err)
98+
}
99+
return
100+
}
101+
102+
if err == nil {
103+
t.Fatal("Err() = nil, want non-nil error")
104+
}
105+
if !errors.Is(err, app.ErrConnectionClosed) {
106+
t.Errorf("Err() = %v, want errors.Is(err, ErrConnectionClosed) == true", err)
107+
}
108+
if tt.is_custom && !errors.Is(err, errCustom) {
109+
t.Errorf("Err() = %v, want errors.Is(err, errCustom) == true", err)
110+
}
111+
})
112+
}
113+
}
114+
115+
func TestClose(t *testing.T) {
116+
t.Run("closes Done channel", func(t *testing.T) {
117+
s := lifecycle.NewState()
118+
s.Close(nil)
119+
select {
120+
case <-s.Done():
121+
case <-time.After(100 * time.Millisecond):
122+
t.Fatal("Done() should be closed after Close()")
123+
}
124+
})
125+
126+
t.Run("idempotent: second call does not overwrite first", func(t *testing.T) {
127+
s := lifecycle.NewState()
128+
s.Close(errCustom)
129+
s.Close(nil)
130+
131+
if err := s.Err(); !errors.Is(err, errCustom) {
132+
t.Errorf("Err() = %v, want errors.Is(err, errCustom) == true after second Close(nil)", err)
133+
}
134+
})
135+
136+
t.Run("concurrent calls are safe and close exactly once", func(t *testing.T) {
137+
const goroutines = 100
138+
s := lifecycle.NewState()
139+
140+
var wg sync.WaitGroup
141+
wg.Add(goroutines)
142+
start := make(chan struct{})
143+
144+
for range goroutines {
145+
go func() {
146+
defer wg.Done()
147+
<-start
148+
s.Close(errCustom)
149+
}()
150+
}
151+
152+
close(start)
153+
wg.Wait()
154+
155+
if err := s.Err(); err == nil {
156+
t.Fatal("Err() = nil after concurrent Close calls, want non-nil")
157+
}
158+
})
159+
}

internal/app/services/connection/create_connection_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ func TestConnect(t *testing.T) {
146146
},
147147
},
148148
{
149-
name: "generated subprotocol appended to input subprotocols",
150-
generator: &mock_subprotocol_generator{result: "header-xyz"},
151-
dialer: &mock_dialer{conn: conn},
149+
name: "generated subprotocol appended to input subprotocols",
150+
generator: &mock_subprotocol_generator{result: "header-xyz"},
151+
dialer: &mock_dialer{conn: conn},
152152
authorizer: &mock_connection_authorizer{timeout: timeout},
153153
input: connection.CreateConnectionInput{
154154
Url: endpoint,

0 commit comments

Comments
 (0)