Skip to content

Commit b0658c0

Browse files
committed
Address GPT 5.4 feedback
Signed-off-by: droctothorpe <mythicalsunlight@gmail.com>
1 parent 56877ee commit b0658c0

8 files changed

Lines changed: 326 additions & 41 deletions

File tree

util/memo/db/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ func ConfigFromConfig(cfg *config.MemoizationConfig) Config {
6969
}
7070

7171
// SessionProxyFromConfig creates a SessionProxy from a MemoizationConfig, returning nil and logging
72-
// an error if the connection cannot be established. Callers that receive nil should fall back to
73-
// ConfigMap-based caching.
72+
// an error if the connection cannot be established. Callers that receive nil should decide how to
73+
// degrade memoization without crashing the controller.
7474
func SessionProxyFromConfig(ctx context.Context, kubectlConfig kubernetes.Interface, namespace string, cfg *config.MemoizationConfig) *sqldb.SessionProxy {
7575
if cfg == nil {
7676
return nil
@@ -90,8 +90,8 @@ func SessionProxyFromConfig(ctx context.Context, kubectlConfig kubernetes.Interf
9090
}
9191

9292
// Migrate runs database migrations for the memoization cache table. It is a no-op when
93-
// cfg.SkipMigration is true. Returns an error if migration fails; callers should fall back
94-
// to ConfigMap-based caching.
93+
// cfg.SkipMigration is true. Returns an error if migration fails; callers should decide how to
94+
// degrade memoization without crashing the controller.
9595
func Migrate(ctx context.Context, sessionProxy *sqldb.SessionProxy, cfg Config) error {
9696
if sessionProxy == nil {
9797
return nil

workflow/controller/cache/cache.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,14 @@ type Type string
124124

125125
const (
126126
// ConfigMapCache is a cache type identifier used as a key prefix in the cache map.
127-
// When a MemoizationDB is configured, SQL-backed caching is used instead.
127+
// When a MemoizationDB is configured, SQL-backed memoization semantics are used instead.
128128
ConfigMapCache Type = "ConfigMapCache"
129129
)
130130

131-
// SetQueries configures the factory to use a SQL backend, clearing any previously
132-
// cached instances so they are recreated against the new backend.
131+
// SetQueries configures the factory's memoization backend, clearing any previously
132+
// cached instances so they are recreated against the new backend. A nil MemoizationDB
133+
// selects ConfigMap-backed caching; a non-nil MemoizationDB selects SQL-backed
134+
// memoization semantics, even if the DB implementation is disabled/no-op.
133135
func (cf *cacheFactory) SetQueries(q memodb.MemoizationDB) {
134136
cf.lock.Lock()
135137
defer cf.lock.Unlock()
@@ -165,8 +167,8 @@ func (cf *cacheFactory) GetCache(ctx context.Context, ct Type, namespace, name s
165167
switch ct {
166168
case ConfigMapCache:
167169
var c MemoizationCache
168-
if cf.queries != nil && cf.queries.IsEnabled() {
169-
c = newSQLDBCache(namespace, name, cf.queries)
170+
if cf.queries != nil {
171+
c = newSQLDBCache(namespace, name, func() memodb.MemoizationDB { return cf.queries }, &cf.lock)
170172
} else {
171173
c = NewConfigMapCache(namespace, cf.kubeclient, name)
172174
}

workflow/controller/cache/cache_factory_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package cache
22

33
import (
4+
"context"
5+
"sync/atomic"
46
"testing"
7+
"time"
58

69
"github.com/stretchr/testify/assert"
710
"github.com/stretchr/testify/require"
811
"k8s.io/client-go/kubernetes/fake"
912

13+
wfv1 "github.com/argoproj/argo-workflows/v4/pkg/apis/workflow/v1alpha1"
1014
"github.com/argoproj/argo-workflows/v4/util/logging"
15+
memodb "github.com/argoproj/argo-workflows/v4/util/memo/db"
1116
)
1217

1318
func TestCacheFactoryNamespacesCachesSeparately(t *testing.T) {
@@ -34,3 +39,100 @@ func TestCacheFactoryRequiresNamespace(t *testing.T) {
3439
cache := factory.GetCache(ctx, ConfigMapCache, "", "shared-cache")
3540
assert.Nil(t, cache)
3641
}
42+
43+
type testMemoizationDB struct {
44+
enabled bool
45+
saveCalls atomic.Int32
46+
loadStart chan struct{}
47+
loadBlock chan struct{}
48+
}
49+
50+
func (t *testMemoizationDB) Load(context.Context, string, string, string) (*memodb.CacheRecord, error) {
51+
if t.loadStart != nil {
52+
close(t.loadStart)
53+
}
54+
if t.loadBlock != nil {
55+
<-t.loadBlock
56+
}
57+
return nil, nil
58+
}
59+
60+
func (t *testMemoizationDB) Save(context.Context, string, string, string, string, *wfv1.Outputs, int64) error {
61+
t.saveCalls.Add(1)
62+
return nil
63+
}
64+
65+
func (*testMemoizationDB) Prune(context.Context) (int64, error) {
66+
return 0, nil
67+
}
68+
69+
func (t *testMemoizationDB) IsEnabled() bool {
70+
return t.enabled
71+
}
72+
73+
func TestCacheFactoryStaleSQLCacheNoopsAfterDisable(t *testing.T) {
74+
ctx := logging.TestContext(t.Context())
75+
factory := NewCacheFactory(fake.NewSimpleClientset()).(*cacheFactory)
76+
queries := &testMemoizationDB{enabled: true}
77+
factory.SetQueries(queries)
78+
79+
cache := factory.GetCache(ctx, ConfigMapCache, "default", "shared-cache")
80+
require.NotNil(t, cache)
81+
82+
factory.SetQueries(nil)
83+
84+
require.NoError(t, cache.Save(ctx, "memo-key", "node-1", &wfv1.Outputs{}, "1h"))
85+
assert.Zero(t, queries.saveCalls.Load())
86+
}
87+
88+
func TestCacheFactoryDisableWaitsForInflightSQLLoad(t *testing.T) {
89+
ctx := logging.TestContext(t.Context())
90+
factory := NewCacheFactory(fake.NewSimpleClientset()).(*cacheFactory)
91+
queries := &testMemoizationDB{
92+
enabled: true,
93+
loadStart: make(chan struct{}),
94+
loadBlock: make(chan struct{}),
95+
}
96+
factory.SetQueries(queries)
97+
98+
cache := factory.GetCache(ctx, ConfigMapCache, "default", "shared-cache")
99+
require.NotNil(t, cache)
100+
101+
loadDone := make(chan struct{})
102+
go func() {
103+
defer close(loadDone)
104+
_, _ = cache.Load(ctx, "memo-key")
105+
}()
106+
107+
select {
108+
case <-queries.loadStart:
109+
case <-time.After(time.Second):
110+
t.Fatal("expected SQL load to start")
111+
}
112+
113+
setQueriesDone := make(chan struct{})
114+
go func() {
115+
factory.SetQueries(nil)
116+
close(setQueriesDone)
117+
}()
118+
119+
select {
120+
case <-setQueriesDone:
121+
t.Fatal("expected SetQueries to wait for in-flight SQL load")
122+
case <-time.After(50 * time.Millisecond):
123+
}
124+
125+
close(queries.loadBlock)
126+
127+
select {
128+
case <-setQueriesDone:
129+
case <-time.After(time.Second):
130+
t.Fatal("expected SetQueries to finish after load completes")
131+
}
132+
133+
select {
134+
case <-loadDone:
135+
case <-time.After(time.Second):
136+
t.Fatal("expected SQL load goroutine to finish")
137+
}
138+
}

workflow/controller/cache/sqldb_cache.go

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"sync"
78

89
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
910

@@ -12,24 +13,50 @@ import (
1213
)
1314

1415
type sqlDBCache struct {
15-
namespace string
16-
name string
17-
queries memodb.MemoizationDB
16+
namespace string
17+
name string
18+
getQueries func() memodb.MemoizationDB
19+
lock *sync.RWMutex
1820
}
1921

20-
func newSQLDBCache(namespace, name string, queries memodb.MemoizationDB) MemoizationCache {
22+
func newSQLDBCache(namespace, name string, getQueries func() memodb.MemoizationDB, lock *sync.RWMutex) MemoizationCache {
2123
return &sqlDBCache{
22-
namespace: namespace,
23-
name: name,
24-
queries: queries,
24+
namespace: namespace,
25+
name: name,
26+
getQueries: getQueries,
27+
lock: lock,
2528
}
2629
}
2730

31+
func (c *sqlDBCache) withQueries(fn func(memodb.MemoizationDB) error) error {
32+
if c.lock == nil {
33+
return fn(memodb.NullMemoizationDB)
34+
}
35+
c.lock.RLock()
36+
defer c.lock.RUnlock()
37+
38+
queries := memodb.NullMemoizationDB
39+
if c.getQueries != nil {
40+
if q := c.getQueries(); q != nil {
41+
queries = q
42+
}
43+
}
44+
return fn(queries)
45+
}
46+
2847
func (c *sqlDBCache) Load(ctx context.Context, key string) (*Entry, error) {
2948
if !cacheKeyRegex.MatchString(key) {
3049
return nil, fmt.Errorf("invalid cache key: %s", key)
3150
}
32-
record, err := c.queries.Load(ctx, c.namespace, c.name, key)
51+
var record *memodb.CacheRecord
52+
err := c.withQueries(func(queries memodb.MemoizationDB) error {
53+
if !queries.IsEnabled() {
54+
return nil
55+
}
56+
var err error
57+
record, err = queries.Load(ctx, c.namespace, c.name, key)
58+
return err
59+
})
3360
if err != nil {
3461
return nil, fmt.Errorf("memoization db load failed: %w", err)
3562
}
@@ -52,9 +79,14 @@ func (c *sqlDBCache) Save(ctx context.Context, key string, nodeID string, value
5279
if !cacheKeyRegex.MatchString(key) {
5380
return fmt.Errorf("invalid cache key: %s", key)
5481
}
55-
maxAgeSeconds, err := ResolveMaxAgeSeconds(maxAge)
56-
if err != nil {
57-
return err
58-
}
59-
return c.queries.Save(ctx, c.namespace, c.name, key, nodeID, value, maxAgeSeconds)
82+
return c.withQueries(func(queries memodb.MemoizationDB) error {
83+
if !queries.IsEnabled() {
84+
return nil
85+
}
86+
maxAgeSeconds, err := ResolveMaxAgeSeconds(maxAge)
87+
if err != nil {
88+
return err
89+
}
90+
return queries.Save(ctx, c.namespace, c.name, key, nodeID, value, maxAgeSeconds)
91+
})
6092
}

workflow/controller/cache/sqldb_cache_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"encoding/json"
88
"strconv"
9+
"sync"
910
"testing"
1011
"time"
1112

@@ -90,7 +91,8 @@ func newTestSQLDBCache(t *testing.T, sp *sqldb.SessionProxy) MemoizationCache {
9091
t.Helper()
9192
queries, err := memodb.NewQueries(testTableName, sp)
9293
require.NoError(t, err)
93-
return newSQLDBCache(testNamespace, testCacheName, queries)
94+
var lock sync.RWMutex
95+
return newSQLDBCache(testNamespace, testCacheName, func() memodb.MemoizationDB { return queries }, &lock)
9496
}
9597

9698
func TestSQLDBCacheSaveAndLoad(t *testing.T) {

workflow/controller/config.go

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,21 @@ var (
2525
memoizationMigrate = memodb.Migrate
2626
)
2727

28+
func (wfc *WorkflowController) resetMemoizationBackend(ctx context.Context, sessionProxy *sqldb.SessionProxy, cacheQueries memodb.MemoizationDB) {
29+
logger := logging.RequireLoggerFromContext(ctx)
30+
wfc.setMemoizationQueries(cacheQueries)
31+
wfc.cacheFactory.SetQueries(cacheQueries)
32+
if sessionProxy == nil {
33+
sessionProxy = wfc.memoSessionProxy
34+
}
35+
wfc.memoSessionProxy = nil
36+
if sessionProxy != nil {
37+
if err := sessionProxy.Close(); err != nil {
38+
logger.WithError(err).Warn(ctx, "Failed to close memoization database session")
39+
}
40+
}
41+
}
42+
2843
func (wfc *WorkflowController) updateConfig(ctx context.Context) error {
2944
logger := logging.RequireLoggerFromContext(ctx)
3045
_, err := yaml.Marshal(wfc.Config)
@@ -85,37 +100,42 @@ func (wfc *WorkflowController) updateConfig(ctx context.Context) error {
85100
logger.Info(ctx, "Persistence configuration disabled")
86101
}
87102

88-
wfc.setMemoizationQueries(memodb.NullMemoizationDB)
89103
memoCfg := wfc.Config.Memoization
90104
if memoCfg != nil {
91105
logger.Info(ctx, "Memoization database configuration enabled")
92-
if wfc.memoSessionProxy == nil {
93-
sp := memoSessionProxyFromConfig(ctx, wfc.kubeclientset, wfc.namespace, memoCfg)
94-
if sp == nil {
95-
return fmt.Errorf("failed to create memoization database session")
106+
sessionProxy := wfc.memoSessionProxy
107+
if sessionProxy == nil {
108+
sessionProxy = memoSessionProxyFromConfig(ctx, wfc.kubeclientset, wfc.namespace, memoCfg)
109+
if sessionProxy == nil {
110+
logger.Warn(ctx, "Memoization database unavailable; memoization disabled")
111+
wfc.resetMemoizationBackend(ctx, nil, memodb.NullMemoizationDB)
112+
goto memoizationConfigured
96113
}
97-
wfc.memoSessionProxy = sp
98114
}
99115
cfg := memodb.ConfigFromConfig(memoCfg)
100-
if err := memoizationMigrate(ctx, wfc.memoSessionProxy, cfg); err != nil {
101-
return fmt.Errorf("memoization database migration failed: %w", err)
116+
if err := memoizationMigrate(ctx, sessionProxy, cfg); err != nil {
117+
logger.WithError(err).Error(ctx, "Memoization database migration failed; memoization disabled")
118+
wfc.resetMemoizationBackend(ctx, sessionProxy, memodb.NullMemoizationDB)
119+
goto memoizationConfigured
102120
}
103-
queries, err := memodb.NewQueries(cfg.TableName, wfc.memoSessionProxy)
121+
queries, err := memodb.NewQueries(cfg.TableName, sessionProxy)
104122
if err != nil {
105-
return err
123+
logger.WithError(err).Error(ctx, "Memoization database initialization failed; memoization disabled")
124+
wfc.resetMemoizationBackend(ctx, sessionProxy, memodb.NullMemoizationDB)
125+
goto memoizationConfigured
106126
}
127+
wfc.memoSessionProxy = sessionProxy
107128
wfc.setMemoizationQueries(queries)
108129
wfc.cacheFactory.SetQueries(queries)
109130
} else {
110131
if wfc.memoSessionProxy != nil {
111132
logger.Info(ctx, "Memoization database configuration removed")
112-
wfc.memoSessionProxy.Close()
113-
wfc.memoSessionProxy = nil
114133
}
115-
wfc.cacheFactory.SetQueries(nil)
134+
wfc.resetMemoizationBackend(ctx, nil, nil)
116135
logger.Info(ctx, "Memoization database configuration disabled; using ConfigMap-based caching")
117136
}
118137

138+
memoizationConfigured:
119139
wfc.hydrator = hydrator.New(wfc.offloadNodeStatusRepo)
120140
wfc.updateEstimatorFactory(ctx)
121141
wfc.rateLimiter = wfc.newRateLimiter()

0 commit comments

Comments
 (0)