Skip to content

Commit 495c410

Browse files
committed
Check if its pgconn.ConnectError{} instead of doing ping.
1 parent 7e6eead commit 495c410

2 files changed

Lines changed: 19 additions & 21 deletions

File tree

internal/sinks/postgres.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,8 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
438438
rowsBatched += n
439439
if err != nil {
440440
logger.Error(err)
441-
if err := pgw.sinkDb.Ping(pgw.ctx); err != nil {
442-
logger.WithError(err).Error("Sink db is not reachable, dropping cached measurements")
441+
if _, ok := err.(*pgconn.ConnectError); ok {
442+
logger.Errorf("Sink DB not reachable, dropping %d cached measurements", len(msgs))
443443
break
444444
}
445445
if PgError, ok := err.(*pgconn.PgError); ok {

internal/sinks/postgres_test.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,19 +1083,25 @@ func TestEnsureMetricDbnameTime_IdempotentAcrossRestarts(t *testing.T) {
10831083
}
10841084

10851085
func TestFlush_CopyFromFailsAndPingSucceeds(t *testing.T) {
1086-
a := assert.New(t)
1087-
conn, err := pgxmock.NewPool()
1088-
a.NoError(err)
1089-
defer conn.Close()
1090-
1091-
pgw := &PostgresWriter{
1092-
ctx: ctx,
1093-
sinkDb: conn,
1094-
opts: &CmdOpts{PartitionInterval: "1 hour"},
1095-
partitionMapMetricDbname: make(map[string]map[string]ExistingPartitionInfo),
1096-
metricSchema: DbStorageSchemaPostgres,
1086+
r := require.New(t)
1087+
1088+
pgContainer, pgTearDown, err := testutil.SetupPostgresContainer()
1089+
r.NoError(err)
1090+
1091+
connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
1092+
r.NoError(err)
1093+
1094+
opts := &CmdOpts{
1095+
PartitionInterval: "1 day",
1096+
RetentionInterval: "7 days",
1097+
MaintenanceInterval: "12 hours",
1098+
BatchingDelay: time.Second,
10971099
}
10981100

1101+
pgw, err := NewPostgresWriter(ctx, connStr, opts)
1102+
r.NoError(err)
1103+
pgTearDown()
1104+
10991105
now := time.Now()
11001106
msgs := []metrics.MeasurementEnvelope{
11011107
{
@@ -1107,13 +1113,5 @@ func TestFlush_CopyFromFailsAndPingSucceeds(t *testing.T) {
11071113
},
11081114
}
11091115

1110-
conn.ExpectQuery("(?i)SELECT.*ensure_partition_metric_dbname_time").
1111-
WithArgs("test_metric", "test_db", pgxmock.AnyArg(), "1 hour").
1112-
WillReturnRows(pgxmock.NewRows([]string{"start_time", "end_time"}).AddRow(now.Add(-time.Hour), now.Add(time.Hour)))
1113-
conn.ExpectCopyFrom(pgx.Identifier{"test_metric"}, targetColumns[:]).WillReturnError(errors.New("copy failed"))
1114-
conn.ExpectPing().WillReturnError(nil)
1115-
11161116
pgw.flush(msgs)
1117-
1118-
a.NoError(conn.ExpectationsWereMet())
11191117
}

0 commit comments

Comments
 (0)