Skip to content

Commit 20a1e76

Browse files
committed
Break from the flush() loop if the db is down
1 parent b28a3d7 commit 20a1e76

2 files changed

Lines changed: 40 additions & 0 deletions

File tree

internal/sinks/postgres.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,10 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
438438
rowsBatched += n
439439
if err != nil {
440440
logger.Error(err)
441+
if err := pgw.sinkDb.Ping(pgw.ctx); err != nil {
442+
logger.WithError(err).Error("Sink db is not reachable, dropping cached measurements")
443+
break
444+
}
441445
if PgError, ok := err.(*pgconn.PgError); ok {
442446
pgw.forceRecreatePartitions = PgError.Code == "23514"
443447
}

internal/sinks/postgres_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,3 +1081,39 @@ func TestEnsureMetricDbnameTime_IdempotentAcrossRestarts(t *testing.T) {
10811081
}
10821082
}
10831083
}
1084+
1085+
func TestFlush_CopyFromFailsAndPingSucceeds(t *testing.T) {
1086+
a := assert.New(t)
1087+
conn, err := pgxmock.NewPool()
1088+
a.NoError(err)
1089+
defer conn.Close()
1090+
1091+
pgw := &PostgresWriter{
1092+
ctx: ctx,
1093+
sinkDb: conn,
1094+
opts: &CmdOpts{PartitionInterval: "1 hour"},
1095+
partitionMapMetricDbname: make(map[string]map[string]ExistingPartitionInfo),
1096+
metricSchema: DbStorageSchemaPostgres,
1097+
}
1098+
1099+
now := time.Now()
1100+
msgs := []metrics.MeasurementEnvelope{
1101+
{
1102+
MetricName: "test_metric",
1103+
Data: metrics.Measurements{
1104+
{"epoch_ns": now.UnixNano(), "value": 1},
1105+
},
1106+
DBName: "test_db",
1107+
},
1108+
}
1109+
1110+
conn.ExpectQuery("(?i)SELECT.*ensure_partition_metric_dbname_time").
1111+
WithArgs("test_metric", "test_db", pgxmock.AnyArg(), "1 hour").
1112+
WillReturnRows(pgxmock.NewRows([]string{"start_time", "end_time"}).AddRow(now.Add(-time.Hour), now.Add(time.Hour)))
1113+
conn.ExpectCopyFrom(pgx.Identifier{"test_metric"}, targetColumns[:]).WillReturnError(errors.New("copy failed"))
1114+
conn.ExpectPing().WillReturnError(nil)
1115+
1116+
pgw.flush(msgs)
1117+
1118+
a.NoError(conn.ExpectationsWereMet())
1119+
}

0 commit comments

Comments
 (0)