Skip to content

Commit d38deb2

Browse files
committed
Check if its pgconn.ConnectError{} instead of doing ping.
1 parent 7e6eead commit d38deb2

3 files changed

Lines changed: 39 additions & 21 deletions

File tree

internal/sinks/ca.crt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIDPzCCAiegAwIBAgIUeENQlQFVH5h7HszFJLLWo+KCQwQwDQYJKoZIhvcNAQEL
3+
BQAwEjEQMA4GA1UEAwwHcGd3YXRjaDAeFw0yNTA2MTEwNTI1MDJaFw0zNTA2MDkw
4+
NTI1MDJaMBIxEDAOBgNVBAMMB3Bnd2F0Y2gwggEiMA0GCSqGSIb3DQEBAQUAA4IB
5+
DwAwggEKAoIBAQDtTW+kyvb3Y5OaYlriKp8HkHt95lfOgxQNZQfiREfEyLWU59bx
6+
0ZIvFmejK38Qc0dlca9d+5tEkxotsbggJflLljfmnzhxsuZpr8SjmDd1m8XSo0IA
7+
oDlVbKO6SZMlsyq3QrAOYAjG1LTPlATqvGAOs9NfFonjwoPXCjIwSfa+wexe5dRD
8+
gJ114AKXw3ck5ZQ4Pw+w5ylgNSfVl548WY9DSOA+6HlZ17MYA1qmMOTwKae5fmsc
9+
xlRPoIV3EJrKas7VlbebDOXOSXDV+9aMW6ox1xanUUDUgabzkzntfmuOttaIX1g1
10+
nSMuHa7EEQF7lxgdg9OU8i/jygdlGcgBUYjtAgMBAAGjgYwwgYkwDAYDVR0TBAUw
11+
AwEB/zAdBgNVHQ4EFgQUKGJs7OXINd2WL6X2meH90eINoJQwTQYDVR0jBEYwRIAU
12+
KGJs7OXINd2WL6X2meH90eINoJShFqQUMBIxEDAOBgNVBAMMB3Bnd2F0Y2iCFHhD
13+
UJUBVR+Yex7MxSSy1qPigkMEMAsGA1UdDwQEAwIBBjANBgkqhkiG9w0BAQsFAAOC
14+
AQEABdY/4rsgMu+sCqEdacNzHqAz9X1ew37y1UONngm/7LPqbQrzzg/fBvOOJLcd
15+
IzMJPtpdwokPYOW29jw/hY4R1RWr8012zc8Z0GsuDR7I/Z2Hww7tzYhf1H5mjy1d
16+
eQDhHNpsSb5pHLoPft5O0sT/0WqAlKWPb2KmSoAio8jE2BSUTK3ZgE0yJIikONon
17+
HCWOlNCWx+RsyPoRnQqbpVa+SmGBqpiyHchpZ8sFPe+pgPu+8u921lJ0PRvmfp7L
18+
4YZIaM8LQAV8FWk2VLXmsqYUJYYLAXCG6Unkx1oIOtq1AyAoXHCl/3hKbCeXIrgA
19+
Cs5qN+ZUHRdKff5gFpraKtHKkw==
20+
-----END CERTIFICATE-----

internal/sinks/postgres.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,8 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
438438
rowsBatched += n
439439
if err != nil {
440440
logger.Error(err)
441-
if err := pgw.sinkDb.Ping(pgw.ctx); err != nil {
442-
logger.WithError(err).Error("Sink db is not reachable, dropping cached measurements")
441+
if _, ok := err.(*pgconn.ConnectError); ok {
442+
logger.Errorf("Sink DB not reachable, dropping %d cached measurements", len(msgs))
443443
break
444444
}
445445
if PgError, ok := err.(*pgconn.PgError); ok {

internal/sinks/postgres_test.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,19 +1083,25 @@ func TestEnsureMetricDbnameTime_IdempotentAcrossRestarts(t *testing.T) {
10831083
}
10841084

10851085
func TestFlush_CopyFromFailsAndPingSucceeds(t *testing.T) {
1086-
a := assert.New(t)
1087-
conn, err := pgxmock.NewPool()
1088-
a.NoError(err)
1089-
defer conn.Close()
1090-
1091-
pgw := &PostgresWriter{
1092-
ctx: ctx,
1093-
sinkDb: conn,
1094-
opts: &CmdOpts{PartitionInterval: "1 hour"},
1095-
partitionMapMetricDbname: make(map[string]map[string]ExistingPartitionInfo),
1096-
metricSchema: DbStorageSchemaPostgres,
1086+
r := require.New(t)
1087+
1088+
pgContainer, pgTearDown, err := testutil.SetupPostgresContainer()
1089+
r.NoError(err)
1090+
1091+
connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
1092+
r.NoError(err)
1093+
1094+
opts := &CmdOpts{
1095+
PartitionInterval: "1 day",
1096+
RetentionInterval: "7 days",
1097+
MaintenanceInterval: "12 hours",
1098+
BatchingDelay: time.Second,
10971099
}
10981100

1101+
pgw, err := NewPostgresWriter(ctx, connStr, opts)
1102+
r.NoError(err)
1103+
pgTearDown()
1104+
10991105
now := time.Now()
11001106
msgs := []metrics.MeasurementEnvelope{
11011107
{
@@ -1107,13 +1113,5 @@ func TestFlush_CopyFromFailsAndPingSucceeds(t *testing.T) {
11071113
},
11081114
}
11091115

1110-
conn.ExpectQuery("(?i)SELECT.*ensure_partition_metric_dbname_time").
1111-
WithArgs("test_metric", "test_db", pgxmock.AnyArg(), "1 hour").
1112-
WillReturnRows(pgxmock.NewRows([]string{"start_time", "end_time"}).AddRow(now.Add(-time.Hour), now.Add(time.Hour)))
1113-
conn.ExpectCopyFrom(pgx.Identifier{"test_metric"}, targetColumns[:]).WillReturnError(errors.New("copy failed"))
1114-
conn.ExpectPing().WillReturnError(nil)
1115-
11161116
pgw.flush(msgs)
1117-
1118-
a.NoError(conn.ExpectationsWereMet())
11191117
}

0 commit comments

Comments
 (0)