Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .spelling
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Lifecycle-Hook
LitmusChaos
MLOps
Makefile
MariaDB
Metaflow
MinIO
Minikube
Expand Down
2 changes: 1 addition & 1 deletion docs/offloading-large-workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Argo stores workflows as Kubernetes resources (i.e. within EtcD). This creates a limit to their size as resources must be under 1MB. Each resource includes the status of each node, which is stored in the `/status/nodes` field for the resource. This can be over 1MB. If this happens, we try and compress the node status and store it in `/status/compressedNodes`. If the status is still too large, we then try and store it in an SQL database.

To enable this feature, configure a Postgres or MySQL database under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `nodeStatusOffLoad: true`.
To enable this feature, configure a Postgres, MySQL, or MariaDB database under `persistence` in [your configuration](workflow-controller-configmap.yaml) and set `nodeStatusOffLoad: true`.

## FAQ

Expand Down
2 changes: 1 addition & 1 deletion docs/synchronization-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Database-based limits allow multiple workflow controllers (typically across diff

Before you can manage database limits via the API, you must:

1. Configure a PostgreSQL or MySQL database for synchronization (see [workflow synchronization](synchronization.md#database-configuration))
1. Configure a PostgreSQL, MySQL, or MariaDB database for synchronization (see [workflow synchronization](synchronization.md#database-configuration))
2. Enable the synchronization API in your workflow controller configuration

### Enable the API
Expand Down
2 changes: 1 addition & 1 deletion docs/synchronization.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ spec:
Multiple controllers can share locks using a database as an intermediary.
This would normally be used to share locks across multiple clusters, but can also be used to share locks across multiple controllers in the same cluster.

To configure multiple controller locks, you need to set up a database (either PostgreSQL or MySQL) and [configure it](#database-configuration) in the workflow-controller-configmap ConfigMap.
To configure multiple controller locks, you need to set up a database (either PostgreSQL, MySQL, or MariaDB) and [configure it](#database-configuration) in the workflow-controller-configmap ConfigMap.
All controllers which want to share locks must share all of these tables.
If you do not configure the database you will get an error if you try to use database locks.

Expand Down
2 changes: 1 addition & 1 deletion docs/workflow-archive.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

> v2.5 and after

If you want to keep completed workflows for a long time, you can use the workflow archive to save them in a Postgres (>=9.4) or MySQL (>= 5.7.8) database.
If you want to keep completed workflows for a long time, you can use the workflow archive to save them in a Postgres (>=9.4), MySQL (>= 5.7.8), or MariaDB (>= 10.2) database.
The workflow archive stores the status of the workflow, which pods have been executed, what was the result etc.
The job logs of the workflow pods will not be archived.
If you need to save the logs of the pods, you must setup an [artifact repository](artifact-repository-ref.md) according to [this doc](configure-artifact-repository.md).
Expand Down
16 changes: 8 additions & 8 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ func (r *workflowArchive) ListWorkflows(ctx context.Context, options sutils.List
baseSelector := s.SQL().Select("name", "namespace", "uid", "phase", "startedat", "finishedat", "creationtimestamp")
selectQuery := baseSelector.
Columns(
db.Raw("coalesce(workflow->'$.metadata.labels', '{}') as labels"),
db.Raw("coalesce(workflow->'$.metadata.annotations', '{}') as annotations"),
db.Raw("coalesce(workflow->>'$.status.progress', '') as progress"),
db.Raw("workflow->>'$.spec.suspend'"),
db.Raw("coalesce(workflow->>'$.spec.arguments', '{}') as arguments"),
db.Raw("coalesce(workflow->>'$.status.message', '') as message"),
db.Raw("coalesce(workflow->>'$.status.estimatedDuration', '0') as estimatedduration"),
db.Raw("coalesce(workflow->'$.status.resourcesDuration', '{}') as resourcesduration"),
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.metadata.labels'), '{}') as labels"),
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.metadata.annotations'), '{}') as annotations"),
db.Raw("coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.status.progress')), '') as progress"),
db.Raw("JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.spec.suspend')) as suspend"),
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.spec.arguments'), '{}') as arguments"),
db.Raw("coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.status.message')), '') as message"),
db.Raw("coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow, '$.status.estimatedDuration')), '0') as estimatedduration"),
db.Raw("coalesce(JSON_EXTRACT(workflow, '$.status.resourcesDuration'), '{}') as resourcesduration"),
).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())
Expand Down
132 changes: 132 additions & 0 deletions persist/sqldb/workflow_archive_mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//go:build !windows

package sqldb

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
testcontainers "github.com/testcontainers/testcontainers-go"
testmysql "github.com/testcontainers/testcontainers-go/modules/mysql"
"github.com/testcontainers/testcontainers-go/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/argoproj/argo-workflows/v4/config"
wfv1 "github.com/argoproj/argo-workflows/v4/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v4/server/utils"
"github.com/argoproj/argo-workflows/v4/util/instanceid"
"github.com/argoproj/argo-workflows/v4/util/logging"
usqldb "github.com/argoproj/argo-workflows/v4/util/sqldb"
)

// setupMySQLArchiveTest starts a MySQL or MariaDB container, runs migrations, and returns a WorkflowArchive.
func setupMySQLArchiveTest(ctx context.Context, t *testing.T, v usqldb.MySQLVariant) WorkflowArchive {
t.Helper()

c, err := testmysql.Run(ctx,
v.Image,
testmysql.WithDatabase("argo"),
testmysql.WithUsername("argo"),
testmysql.WithPassword("argo"),
testcontainers.WithWaitStrategy(
wait.ForAll(
wait.ForLog(v.WaitMessage).WithStartupTimeout(60*time.Second),
wait.ForListeningPort("3306/tcp"),
)),
)
require.NoError(t, err)
t.Cleanup(func() {
if termErr := testcontainers.TerminateContainer(c); termErr != nil {
t.Logf("failed to terminate container: %s", termErr)
}
})

host, err := c.Host(ctx)
require.NoError(t, err)
p, err := c.MappedPort(ctx, "3306/tcp")
require.NoError(t, err)
port, err := strconv.Atoi(p.Port())
require.NoError(t, err)

proxy, err := usqldb.NewSessionProxy(ctx, usqldb.SessionProxyConfig{
DBConfig: config.DBConfig{
MySQL: &config.MySQLConfig{
DatabaseConfig: config.DatabaseConfig{
Database: "argo",
Host: host,
Port: port,
},
},
},
Username: "argo",
Password: "argo",
})
require.NoError(t, err)

err = Migrate(ctx, proxy.Session(), "test", "argo_workflows", proxy.DBType())
require.NoError(t, err)

t.Cleanup(func() { proxy.Close() })

return NewWorkflowArchive(proxy, "test", "", instanceid.NewService(""))
}

// TestMySQLListWorkflows verifies that JSON_EXTRACT/JSON_UNQUOTE queries in
// ListWorkflows execute correctly against both MySQL and MariaDB.
func TestMySQLListWorkflows(t *testing.T) {
for name, variant := range usqldb.MySQLVariants {
t.Run(name, func(t *testing.T) {
ctx := logging.TestContext(t.Context())
archive := setupMySQLArchiveTest(ctx, t, variant)

now := metav1.Now()
err := archive.ArchiveWorkflow(ctx, &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "test-wf",
Namespace: "default",
UID: types.UID("test-uid-001"),
CreationTimestamp: now,
Labels: map[string]string{"env": "test"},
Annotations: map[string]string{"note": "integration-test"},
},
Spec: wfv1.WorkflowSpec{
Suspend: new(true),
Arguments: wfv1.Arguments{
Parameters: []wfv1.Parameter{
{Name: "msg", Value: wfv1.AnyStringPtr("hello")},
},
},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowSucceeded,
StartedAt: now,
FinishedAt: now,
Progress: "1/1",
Message: "completed",
EstimatedDuration: wfv1.EstimatedDuration(30),
},
})
require.NoError(t, err)

results, err := archive.ListWorkflows(ctx, sutils.ListOptions{Namespace: "default", Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)

wf := results[0]
assert.Equal(t, "test-wf", wf.Name)
assert.Equal(t, wfv1.WorkflowSucceeded, wf.Status.Phase)
assert.Equal(t, wfv1.Progress("1/1"), wf.Status.Progress)
assert.Equal(t, "completed", wf.Status.Message)
assert.Equal(t, "test", wf.GetLabels()["env"])
assert.Equal(t, "integration-test", wf.GetAnnotations()["note"])
assert.Equal(t, new(true), wf.Spec.Suspend)
assert.Equal(t, "hello", wf.Spec.Arguments.Parameters[0].Value.String())
assert.Equal(t, wfv1.EstimatedDuration(30), wf.Status.EstimatedDuration)
})
}
}
13 changes: 13 additions & 0 deletions util/sqldb/mysql_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sqldb

// MySQLVariant defines a MySQL-compatible database image for integration testing.
type MySQLVariant struct {
Image string
WaitMessage string
}

// MySQLVariants contains the set of MySQL-compatible databases to test against.
var MySQLVariants = map[string]MySQLVariant{
"MySQL": {Image: "mysql:8.4", WaitMessage: "port: 3306 MySQL Community Server"},
"MariaDB": {Image: "mariadb:11.4", WaitMessage: "mariadbd: ready for connections"},
}
15 changes: 8 additions & 7 deletions util/sqldb/sqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,14 @@ func createPostGresDBSessionWithCreds(cfg *config.PostgreSQLConfig, persistPool
func createMySQLDBSessionWithCreds(cfg *config.MySQLConfig, persistPool *config.ConnectionPool, username, password string) (db.Session, error) {
// Build MySQL DSN using mysql.Config to safely handle special characters in credentials
mysqlCfg := mysql.Config{
User: username,
Passwd: password,
Net: "tcp",
Addr: cfg.GetHostname(),
DBName: cfg.Database,
ParseTime: true,
Params: cfg.Options,
User: username,
Passwd: password,
Net: "tcp",
Addr: cfg.GetHostname(),
DBName: cfg.Database,
ParseTime: true,
AllowNativePasswords: true, // Required for MariaDB which uses mysql_native_password by default
Params: cfg.Options,
}
dsn := mysqlCfg.FormatDSN()

Expand Down
75 changes: 75 additions & 0 deletions util/sqldb/sqldb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//go:build !windows

package sqldb

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
testcontainers "github.com/testcontainers/testcontainers-go"
testmysql "github.com/testcontainers/testcontainers-go/modules/mysql"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/argoproj/argo-workflows/v4/config"
"github.com/argoproj/argo-workflows/v4/util/logging"
)

// setupMySQLContainer starts a MySQL or MariaDB container and returns the corresponding DBConfig.
func setupMySQLContainer(ctx context.Context, t *testing.T, v MySQLVariant) config.DBConfig {
t.Helper()

c, err := testmysql.Run(ctx,
v.Image,
testmysql.WithDatabase("argo"),
testmysql.WithUsername("argo"),
testmysql.WithPassword("argo"),
testcontainers.WithWaitStrategy(
wait.ForAll(
wait.ForLog(v.WaitMessage).WithStartupTimeout(60*time.Second),
wait.ForListeningPort("3306/tcp"),
)),
)
require.NoError(t, err)
t.Cleanup(func() {
if termErr := testcontainers.TerminateContainer(c); termErr != nil {
t.Logf("failed to terminate container: %s", termErr)
}
})

host, err := c.Host(ctx)
require.NoError(t, err)
p, err := c.MappedPort(ctx, "3306/tcp")
require.NoError(t, err)
port, err := strconv.Atoi(p.Port())
require.NoError(t, err)

return config.DBConfig{
MySQL: &config.MySQLConfig{
DatabaseConfig: config.DatabaseConfig{
Database: "argo",
Host: host,
Port: port,
},
},
}
}

// TestMySQLSessionConnect verifies that CreateDBSessionWithCreds can connect
// to both MySQL and MariaDB. MariaDB requires AllowNativePasswords.
func TestMySQLSessionConnect(t *testing.T) {
for name, variant := range MySQLVariants {
t.Run(name, func(t *testing.T) {
ctx := logging.TestContext(t.Context())
dbConfig := setupMySQLContainer(ctx, t, variant)

sess, _, err := CreateDBSessionWithCreds(dbConfig, "argo", "argo")
require.NoError(t, err)
defer sess.Close()

require.NoError(t, sess.Ping())
})
}
}
Loading