Skip to content

Commit 8e4647b

Browse files
fix: add suspend column alias and improve test coverage for review feedback
Signed-off-by: Mario Apostolov <mario.apostolov@mariadb.com>
1 parent 163c051 commit 8e4647b

8 files changed

Lines changed: 64 additions & 64 deletions

docs/offloading-large-workflows.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
55
Argo stores workflows as Kubernetes resources (i.e. within EtcD). This creates a limit to their size as resources must be under 1MB. Each resource includes the status of each node, which is stored in the `/status/nodes` field for the resource. This can be over 1MB. If this happens, we try and compress the node status and store it in `/status/compressedNodes`. If the status is still too large, we then try and store it in an SQL database.
66

7-
To enable this feature, configure a Postgres or MySQL database under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `nodeStatusOffLoad: true`.
7+
To enable this feature, configure a Postgres, MySQL, or MariaDB database under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `nodeStatusOffLoad: true`.
88

99
## FAQ
1010

docs/synchronization-config.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Database-based limits allow multiple workflow controllers (typically across diff
3131

3232
Before you can manage database limits via the API, you must:
3333

34-
1. Configure a PostgreSQL or MySQL database for synchronization (see [workflow synchronization](synchronization.md#database-configuration))
34+
1. Configure a PostgreSQL, MySQL, or MariaDB database for synchronization (see [workflow synchronization](synchronization.md#database-configuration))
3535
2. Enable the synchronization API in your workflow controller configuration
3636

3737
### Enable the API

docs/synchronization.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ spec:
6868
Multiple controllers can share locks using a database as an intermediary.
6969
This would normally be used to share locks across multiple clusters, but can also be used to share locks across multiple controllers in the same cluster.
7070
71-
To configure multiple controller locks, you need to set up a database (either PostgreSQL or MySQL) and [configure it](#database-configuration) in the workflow-controller-configmap ConfigMap.
71+
To configure multiple controller locks, you need to set up a database (either PostgreSQL, MySQL, or MariaDB) and [configure it](#database-configuration) in the workflow-controller-configmap ConfigMap.
7272
All controllers which want to share locks must share all of these tables.
7373
If you do not configure the database you will get an error if you try to use database locks.
7474

persist/sqldb/workflow_archive.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ func (r *workflowArchive) ListWorkflows(ctx context.Context, options sutils.List
187187
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.metadata.labels'), '{}') as labels"),
188188
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.metadata.annotations'), '{}') as annotations"),
189189
db.Raw("coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.status.progress')), '') as progress"),
190-
db.Raw("JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.spec.suspend'))"),
191-
db.Raw("coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.spec.arguments')), '{}') as arguments"),
190+
db.Raw("JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.spec.suspend')) as suspend"),
191+
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.spec.arguments'), '{}') as arguments"),
192192
db.Raw("coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.status.message')), '') as message"),
193193
db.Raw("coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.status.estimatedDuration')), '0') as estimatedduration"),
194194
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.status.resourcesDuration'), '{}') as resourcesduration"),
Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
//go:build !windows
2+
13
package sqldb
24

35
import (
46
"context"
5-
"runtime"
67
"strconv"
78
"testing"
89
"time"
@@ -14,6 +15,7 @@ import (
1415
"github.com/testcontainers/testcontainers-go/wait"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1617
"k8s.io/apimachinery/pkg/types"
18+
"k8s.io/utils/ptr"
1719

1820
"github.com/argoproj/argo-workflows/v4/config"
1921
wfv1 "github.com/argoproj/argo-workflows/v4/pkg/apis/workflow/v1alpha1"
@@ -23,31 +25,27 @@ import (
2325
usqldb "github.com/argoproj/argo-workflows/v4/util/sqldb"
2426
)
2527

26-
type mysqlVariant struct {
27-
image string
28-
waitMessage string
29-
}
30-
31-
var mysqlVariants = map[string]mysqlVariant{
32-
"MySQL": {image: "mysql:8.4", waitMessage: "port: 3306 MySQL Community Server"},
33-
"MariaDB": {image: "mariadb:11.4", waitMessage: "mariadbd: ready for connections"},
34-
}
35-
36-
func setupMySQLArchiveTest(ctx context.Context, t *testing.T, v mysqlVariant) (WorkflowArchive, func()) {
28+
// setupMySQLArchiveTest starts a MySQL or MariaDB container, runs migrations, and returns a WorkflowArchive.
29+
func setupMySQLArchiveTest(ctx context.Context, t *testing.T, v usqldb.MySQLVariant) WorkflowArchive {
3730
t.Helper()
3831

3932
c, err := testmysql.Run(ctx,
40-
v.image,
33+
v.Image,
4134
testmysql.WithDatabase("argo"),
4235
testmysql.WithUsername("argo"),
4336
testmysql.WithPassword("argo"),
4437
testcontainers.WithWaitStrategy(
4538
wait.ForAll(
46-
wait.ForLog(v.waitMessage).WithStartupTimeout(60*time.Second),
39+
wait.ForLog(v.WaitMessage).WithStartupTimeout(60*time.Second),
4740
wait.ForListeningPort("3306/tcp"),
4841
)),
4942
)
5043
require.NoError(t, err)
44+
t.Cleanup(func() {
45+
if err := testcontainers.TerminateContainer(c); err != nil {
46+
t.Logf("failed to terminate container: %s", err)
47+
}
48+
})
5149

5250
host, err := c.Host(ctx)
5351
require.NoError(t, err)
@@ -74,24 +72,18 @@ func setupMySQLArchiveTest(ctx context.Context, t *testing.T, v mysqlVariant) (W
7472
err = Migrate(ctx, proxy.Session(), "test", "argo_workflows", proxy.DBType())
7573
require.NoError(t, err)
7674

77-
return NewWorkflowArchive(proxy, "test", "", instanceid.NewService("")), func() {
78-
proxy.Close()
79-
testcontainers.TerminateContainer(c) //nolint:errcheck
80-
}
75+
t.Cleanup(func() { proxy.Close() })
76+
77+
return NewWorkflowArchive(proxy, "test", "", instanceid.NewService(""))
8178
}
8279

8380
// TestMySQLListWorkflows verifies that JSON_EXTRACT/JSON_UNQUOTE queries in
8481
// ListWorkflows execute correctly against both MySQL and MariaDB.
8582
func TestMySQLListWorkflows(t *testing.T) {
86-
if runtime.GOOS == "windows" {
87-
t.Skip("test requires Linux container")
88-
}
89-
90-
for name, variant := range mysqlVariants {
83+
for name, variant := range usqldb.MySQLVariants {
9184
t.Run(name, func(t *testing.T) {
9285
ctx := logging.TestContext(t.Context())
93-
archive, cleanup := setupMySQLArchiveTest(ctx, t, variant)
94-
defer cleanup()
86+
archive := setupMySQLArchiveTest(ctx, t, variant)
9587

9688
now := metav1.Now()
9789
err := archive.ArchiveWorkflow(ctx, &wfv1.Workflow{
@@ -100,24 +92,24 @@ func TestMySQLListWorkflows(t *testing.T) {
10092
Namespace: "default",
10193
UID: types.UID("test-uid-001"),
10294
CreationTimestamp: now,
103-
Labels: map[string]string{
104-
"workflows.argoproj.io/archive-strategy": "Persisted",
105-
"env": "test",
106-
},
95+
Labels: map[string]string{"env": "test"},
96+
Annotations: map[string]string{"note": "integration-test"},
10797
},
10898
Spec: wfv1.WorkflowSpec{
99+
Suspend: ptr.To(true),
109100
Arguments: wfv1.Arguments{
110101
Parameters: []wfv1.Parameter{
111102
{Name: "msg", Value: wfv1.AnyStringPtr("hello")},
112103
},
113104
},
114105
},
115106
Status: wfv1.WorkflowStatus{
116-
Phase: wfv1.WorkflowSucceeded,
117-
StartedAt: now,
118-
FinishedAt: now,
119-
Progress: "1/1",
120-
Message: "completed",
107+
Phase: wfv1.WorkflowSucceeded,
108+
StartedAt: now,
109+
FinishedAt: now,
110+
Progress: "1/1",
111+
Message: "completed",
112+
EstimatedDuration: wfv1.EstimatedDuration(30),
121113
},
122114
})
123115
require.NoError(t, err)
@@ -132,6 +124,10 @@ func TestMySQLListWorkflows(t *testing.T) {
132124
assert.Equal(t, wfv1.Progress("1/1"), wf.Status.Progress)
133125
assert.Equal(t, "completed", wf.Status.Message)
134126
assert.Equal(t, "test", wf.GetLabels()["env"])
127+
assert.Equal(t, "integration-test", wf.GetAnnotations()["note"])
128+
assert.Equal(t, ptr.To(true), wf.Spec.Suspend)
129+
assert.Equal(t, "hello", wf.Spec.Arguments.Parameters[0].Value.String())
130+
assert.Equal(t, wfv1.EstimatedDuration(30), wf.Status.EstimatedDuration)
135131
})
136132
}
137133
}

util/sqldb/mysql_test_helper.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package sqldb
2+
3+
// MySQLVariant defines a MySQL-compatible database image for integration testing.
4+
type MySQLVariant struct {
5+
Image string
6+
WaitMessage string
7+
}
8+
9+
// MySQLVariants contains the set of MySQL-compatible databases to test against.
10+
var MySQLVariants = map[string]MySQLVariant{
11+
"MySQL": {Image: "mysql:8.4", WaitMessage: "port: 3306 MySQL Community Server"},
12+
"MariaDB": {Image: "mariadb:11.4", WaitMessage: "mariadbd: ready for connections"},
13+
}

util/sqldb/sqldb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func createMySQLDBSessionWithCreds(cfg *config.MySQLConfig, persistPool *config.
202202
Addr: cfg.GetHostname(),
203203
DBName: cfg.Database,
204204
ParseTime: true,
205-
AllowNativePasswords: true,
205+
AllowNativePasswords: true, // Required for MariaDB which uses mysql_native_password by default
206206
Params: cfg.Options,
207207
}
208208
dsn := mysqlCfg.FormatDSN()

util/sqldb/sqldb_test.go

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
//go:build !windows
2+
13
package sqldb
24

35
import (
46
"context"
5-
"runtime"
67
"strconv"
78
"testing"
89
"time"
@@ -16,31 +17,27 @@ import (
1617
"github.com/argoproj/argo-workflows/v4/util/logging"
1718
)
1819

19-
type mysqlVariant struct {
20-
image string
21-
waitMessage string
22-
}
23-
24-
var mysqlVariants = map[string]mysqlVariant{
25-
"MySQL": {image: "mysql:8.4", waitMessage: "port: 3306 MySQL Community Server"},
26-
"MariaDB": {image: "mariadb:11.4", waitMessage: "mariadbd: ready for connections"},
27-
}
28-
29-
func setupMySQLContainer(ctx context.Context, t *testing.T, v mysqlVariant) (config.DBConfig, func()) {
20+
// setupMySQLContainer starts a MySQL or MariaDB container and returns the corresponding DBConfig.
21+
func setupMySQLContainer(ctx context.Context, t *testing.T, v MySQLVariant) config.DBConfig {
3022
t.Helper()
3123

3224
c, err := testmysql.Run(ctx,
33-
v.image,
25+
v.Image,
3426
testmysql.WithDatabase("argo"),
3527
testmysql.WithUsername("argo"),
3628
testmysql.WithPassword("argo"),
3729
testcontainers.WithWaitStrategy(
3830
wait.ForAll(
39-
wait.ForLog(v.waitMessage).WithStartupTimeout(60*time.Second),
31+
wait.ForLog(v.WaitMessage).WithStartupTimeout(60*time.Second),
4032
wait.ForListeningPort("3306/tcp"),
4133
)),
4234
)
4335
require.NoError(t, err)
36+
t.Cleanup(func() {
37+
if err := testcontainers.TerminateContainer(c); err != nil {
38+
t.Logf("failed to terminate container: %s", err)
39+
}
40+
})
4441

4542
host, err := c.Host(ctx)
4643
require.NoError(t, err)
@@ -57,27 +54,21 @@ func setupMySQLContainer(ctx context.Context, t *testing.T, v mysqlVariant) (con
5754
Port: port,
5855
},
5956
},
60-
}, func() { testcontainers.TerminateContainer(c) } //nolint:errcheck
57+
}
6158
}
6259

6360
// TestMySQLSessionConnect verifies that CreateDBSessionWithCreds can connect
6461
// to both MySQL and MariaDB. MariaDB requires AllowNativePasswords.
6562
func TestMySQLSessionConnect(t *testing.T) {
66-
if runtime.GOOS == "windows" {
67-
t.Skip("test requires Linux container")
68-
}
69-
70-
for name, variant := range mysqlVariants {
63+
for name, variant := range MySQLVariants {
7164
t.Run(name, func(t *testing.T) {
7265
ctx := logging.TestContext(t.Context())
73-
dbConfig, cleanup := setupMySQLContainer(ctx, t, variant)
74-
defer cleanup()
66+
dbConfig := setupMySQLContainer(ctx, t, variant)
7567

76-
sess, dbType, err := CreateDBSessionWithCreds(dbConfig, "argo", "argo")
68+
sess, _, err := CreateDBSessionWithCreds(dbConfig, "argo", "argo")
7769
require.NoError(t, err)
7870
defer sess.Close()
7971

80-
require.Equal(t, MySQL, dbType)
8172
require.NoError(t, sess.Ping())
8273
})
8374
}

0 commit comments

Comments
 (0)