Skip to content

Commit 905eb1d

Browse files
fix: Remove nodecomposite warning logs when using dawgs graph driver BED-8293 (#2798)
* added new connection pool * ran pfc * starting moving db config back to bhce from dawgs * chore: move db configuration back under BloodHound chore: add db pool generation to BloodHound * adding original fix of using pgxpool.New to avoid errors * chore: fix for graphdb vs appdb pools * refactored dbpool * updated outdated comment * another refactor! passing the pool config vs the connection string * bumped dawgs version * pfc * pfc again --------- Co-authored-by: Alyx Holms <aholms@specterops.io>
1 parent 8c8d768 commit 905eb1d

23 files changed

Lines changed: 263 additions & 102 deletions

File tree

cmd/api/src/api/dbpool/dbpool.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2026 Specter Ops, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// SPDX-License-Identifier: Apache-2.0
16+
package dbpool
17+
18+
import (
19+
"context"
20+
"time"
21+
22+
"github.com/jackc/pgx/v5"
23+
"github.com/jackc/pgx/v5/pgxpool"
24+
"github.com/specterops/bloodhound/cmd/api/src/config"
25+
"github.com/specterops/dawgs/drivers/pg"
26+
)
27+
28+
const (
29+
poolInitConnectionTimeout = time.Second * 10
30+
)
31+
32+
func newPoolCfg(cfg config.DatabaseConfiguration) (*pgxpool.Config, error) {
33+
poolCfg, err := pgxpool.ParseConfig(cfg.PostgreSQLConnectionString())
34+
if err != nil {
35+
return nil, err
36+
}
37+
38+
// TODO: Min and Max connections for the pool should be configurable
39+
poolCfg.MinConns = 5
40+
poolCfg.MaxConns = 50
41+
42+
if cfg.EnableRDSIAMAuth {
43+
// Only enable the BeforeConnect handler if RDS IAM Auth is enabled
44+
poolCfg.BeforeConnect = func(ctx context.Context, connCfg *pgx.ConnConfig) error {
45+
if newPoolCfg, err := pgxpool.ParseConfig(cfg.RDSIAMAuthConnectionString()); err != nil {
46+
return err
47+
} else {
48+
connCfg.Host = newPoolCfg.ConnConfig.Host
49+
connCfg.Port = newPoolCfg.ConnConfig.Port
50+
51+
connCfg.User = newPoolCfg.ConnConfig.User
52+
connCfg.Password = newPoolCfg.ConnConfig.Password
53+
connCfg.Database = newPoolCfg.ConnConfig.Database
54+
}
55+
56+
return nil
57+
}
58+
}
59+
60+
return poolCfg, nil
61+
}
62+
63+
func NewDawgsPool(cfg config.DatabaseConfiguration) (*pgxpool.Pool, error) {
64+
if poolCfg, err := newPoolCfg(cfg); err != nil {
65+
return nil, err
66+
} else {
67+
return pg.NewPool(poolCfg)
68+
}
69+
}
70+
71+
func NewAppPool(cfg config.DatabaseConfiguration) (*pgxpool.Pool, error) {
72+
poolCtx, done := context.WithTimeout(context.Background(), poolInitConnectionTimeout)
73+
defer done()
74+
75+
if poolCfg, err := newPoolCfg(cfg); err != nil {
76+
return nil, err
77+
} else {
78+
return pgxpool.NewWithConfig(poolCtx, poolCfg)
79+
}
80+
}

cmd/api/src/api/tools/pg.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/neo4j/neo4j-go-driver/v5/neo4j/dbtype"
2727
"github.com/specterops/bloodhound/cmd/api/src/api"
28+
"github.com/specterops/bloodhound/cmd/api/src/api/dbpool"
2829
"github.com/specterops/bloodhound/cmd/api/src/config"
2930
"github.com/specterops/bloodhound/packages/go/bhlog/attr"
3031
"github.com/specterops/bloodhound/packages/go/bhlog/measure"
@@ -255,7 +256,7 @@ type PGMigrator struct {
255256
migrationCancelFunc func()
256257
State MigratorState
257258
lock *sync.Mutex
258-
Cfg config.Configuration
259+
cfg config.Configuration
259260
}
260261

261262
func NewPGMigrator(serverCtx context.Context, cfg config.Configuration, graphSchema graph.Schema, graphDBSwitch *graph.DatabaseSwitch) *PGMigrator {
@@ -265,7 +266,7 @@ func NewPGMigrator(serverCtx context.Context, cfg config.Configuration, graphSch
265266
ServerCtx: serverCtx,
266267
State: StateIdle,
267268
lock: &sync.Mutex{},
268-
Cfg: cfg,
269+
cfg: cfg,
269270
}
270271
}
271272

@@ -297,7 +298,7 @@ func (s *PGMigrator) SwitchPostgreSQL(response http.ResponseWriter, request *htt
297298
}, http.StatusInternalServerError, response)
298299
} else if err := pgDB.AssertSchema(request.Context(), s.graphSchema); err != nil {
299300
slog.ErrorContext(request.Context(), "Unable to assert graph schema in PostgreSQL", attr.Error(err))
300-
} else if err := SetGraphDriver(request.Context(), s.Cfg, pg.DriverName); err != nil {
301+
} else if err := SetGraphDriver(request.Context(), s.cfg, pg.DriverName); err != nil {
301302
api.WriteJSONResponse(request.Context(), map[string]any{
302303
"error": fmt.Errorf("failed updating graph database driver preferences: %w", err),
303304
}, http.StatusInternalServerError, response)
@@ -314,7 +315,7 @@ func (s *PGMigrator) SwitchNeo4j(response http.ResponseWriter, request *http.Req
314315
api.WriteJSONResponse(request.Context(), map[string]any{
315316
"error": fmt.Errorf("failed connecting to Neo4j: %w", err),
316317
}, http.StatusInternalServerError, response)
317-
} else if err := SetGraphDriver(request.Context(), s.Cfg, neo4j.DriverName); err != nil {
318+
} else if err := SetGraphDriver(request.Context(), s.cfg, neo4j.DriverName); err != nil {
318319
api.WriteJSONResponse(request.Context(), map[string]any{
319320
"error": fmt.Errorf("failed updating graph database driver preferences: %w", err),
320321
}, http.StatusInternalServerError, response)
@@ -449,12 +450,12 @@ func (s *PGMigrator) MigrationStatus(response http.ResponseWriter, request *http
449450
}
450451

451452
func (s *PGMigrator) OpenPostgresGraphConnection() (graph.Database, error) {
452-
if pool, err := pg.NewPool(s.Cfg.Database); err != nil {
453+
if pool, err := dbpool.NewDawgsPool(s.cfg.Database); err != nil {
453454
return nil, err
454455
} else {
455456
return dawgs.Open(s.ServerCtx, pg.DriverName, dawgs.Config{
456457
GraphQueryMemoryLimit: size.Gibibyte,
457-
ConnectionString: s.Cfg.Database.PostgreSQLConnectionString(),
458+
ConnectionString: s.cfg.Database.PostgreSQLConnectionString(),
458459
Pool: pool,
459460
})
460461
}
@@ -463,6 +464,6 @@ func (s *PGMigrator) OpenPostgresGraphConnection() (graph.Database, error) {
463464
func (s *PGMigrator) OpenNeo4jGraphConnection() (graph.Database, error) {
464465
return dawgs.Open(s.ServerCtx, neo4j.DriverName, dawgs.Config{
465466
GraphQueryMemoryLimit: size.Gibibyte,
466-
ConnectionString: s.Cfg.Neo4J.Neo4jConnectionString(),
467+
ConnectionString: s.cfg.Neo4J.Neo4JConnectionString(),
467468
})
468469
}

cmd/api/src/bootstrap/util.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"os"
2424

2525
"github.com/jackc/pgx/v5/pgxpool"
26+
"github.com/specterops/bloodhound/cmd/api/src/api/dbpool"
2627
"github.com/specterops/bloodhound/cmd/api/src/api/tools"
2728
"github.com/specterops/bloodhound/cmd/api/src/config"
2829
"github.com/specterops/dawgs"
@@ -92,13 +93,13 @@ func ConnectGraph(ctx context.Context, cfg config.Configuration) (*graph.Databas
9293
switch driverName {
9394
case neo4j.DriverName:
9495
slog.InfoContext(ctx, "Connecting to graph using Neo4j")
95-
connectionString = cfg.Neo4J.Neo4jConnectionString()
96+
connectionString = cfg.Neo4J.Neo4JConnectionString()
9697

9798
case pg.DriverName:
9899
slog.InfoContext(ctx, "Connecting to graph using PostgreSQL")
99100
connectionString = cfg.Database.PostgreSQLConnectionString()
100101

101-
pool, err = pg.NewPool(cfg.Database)
102+
pool, err = dbpool.NewDawgsPool(cfg.Database)
102103
if err != nil {
103104
return nil, err
104105
}

cmd/api/src/cmd/dawgs-harness/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ import (
2929

3030
"github.com/jackc/pgx/v5/pgxpool"
3131
"github.com/jedib0t/go-pretty/v6/table"
32+
"github.com/specterops/bloodhound/cmd/api/src/api/dbpool"
3233
"github.com/specterops/bloodhound/cmd/api/src/cmd/dawgs-harness/tests"
3334
"github.com/specterops/bloodhound/cmd/api/src/config"
3435
"github.com/specterops/bloodhound/packages/go/bhlog"
3536
schema "github.com/specterops/bloodhound/packages/go/graphschema"
3637
"github.com/specterops/dawgs"
37-
"github.com/specterops/dawgs/drivers"
3838
"github.com/specterops/dawgs/drivers/neo4j"
3939
"github.com/specterops/dawgs/drivers/pg"
4040
"github.com/specterops/dawgs/graph"
@@ -46,14 +46,14 @@ func fatalf(format string, args ...any) {
4646
os.Exit(1)
4747
}
4848

49-
func RunTestSuite(ctx context.Context, connectionStr, driverName string, cfg drivers.DatabaseConfiguration) tests.TestSuite {
49+
func RunTestSuite(ctx context.Context, connectionStr, driverName string, cfg config.DatabaseConfiguration) tests.TestSuite {
5050
var (
5151
pool *pgxpool.Pool
5252
err error
5353
)
5454

5555
if driverName == pg.DriverName {
56-
pool, err = pg.NewPool(cfg)
56+
pool, err = dbpool.NewDawgsPool(cfg)
5757
if err != nil {
5858
fatalf("Failed creating a new pgxpool: %s", err)
5959
}

cmd/api/src/config/config.go

Lines changed: 104 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,26 @@
1717
package config
1818

1919
import (
20+
"context"
2021
"encoding/base64"
2122
"encoding/json"
2223
"errors"
2324
"fmt"
2425
"log/slog"
26+
"net"
27+
"net/url"
2528
"os"
2629
"path/filepath"
2730
"regexp"
2831
"strconv"
2932
"strings"
3033

34+
awsConfig "github.com/aws/aws-sdk-go-v2/config"
35+
"github.com/aws/aws-sdk-go-v2/feature/rds/auth"
36+
3137
"github.com/specterops/bloodhound/cmd/api/src/serde"
3238
"github.com/specterops/bloodhound/packages/go/bhlog/attr"
3339
"github.com/specterops/bloodhound/packages/go/crypto"
34-
dawgs "github.com/specterops/dawgs/drivers"
3540
)
3641

3742
const (
@@ -51,6 +56,71 @@ func (s TLSConfiguration) Enabled() bool {
5156
return s.CertFile != "" && s.KeyFile != ""
5257
}
5358

59+
type DatabaseConfiguration struct {
60+
Connection string `json:"connection"`
61+
Address string `json:"addr"`
62+
Database string `json:"database"`
63+
Username string `json:"username"`
64+
Secret string `json:"secret"`
65+
MaxConcurrentSessions int `json:"max_concurrent_sessions"`
66+
EnableRDSIAMAuth bool `json:"enable_rds_iam_auth"`
67+
}
68+
69+
func (s DatabaseConfiguration) PostgreSQLConnectionString() string {
70+
if s.Connection != "" {
71+
return s.Connection
72+
}
73+
74+
return fmt.Sprintf("postgresql://%s:%s@%s/%s", s.Username, s.Secret, s.Address, s.Database)
75+
}
76+
77+
func (s DatabaseConfiguration) Neo4JConnectionString() string {
78+
if s.Connection != "" {
79+
return s.Connection
80+
}
81+
82+
return fmt.Sprintf("neo4j://%s:%s@%s/%s", s.Username, s.Secret, s.Address, s.Database)
83+
}
84+
85+
func (s DatabaseConfiguration) RDSIAMAuthConnectionString() string {
86+
if cfg, err := awsConfig.LoadDefaultConfig(context.TODO()); err != nil {
87+
slog.Error("AWS Config Loading Error", slog.String("err", err.Error()))
88+
} else {
89+
// Must use instance endpoint with IAM auth
90+
endpoint := s.LookupEndpoint()
91+
92+
slog.Info("Requesting RDS IAM Auth Token")
93+
94+
if authenticationToken, err := auth.BuildAuthToken(context.TODO(), endpoint, cfg.Region, s.Username, cfg.Credentials); err != nil {
95+
slog.Error("RDS IAM Auth Token Request Error", slog.String("err", err.Error()))
96+
} else {
97+
slog.Info("RDS IAM Auth Token Created")
98+
return fmt.Sprintf("postgresql://%s:%s@%s/%s", s.Username, url.QueryEscape(authenticationToken), endpoint, s.Database)
99+
}
100+
}
101+
102+
slog.Warn("Failed to create IAM auth token. Falling back to default Postgres connection string")
103+
return s.PostgreSQLConnectionString()
104+
}
105+
106+
func (s DatabaseConfiguration) LookupEndpoint() string {
107+
host, port, err := net.SplitHostPort(s.Address)
108+
if err != nil {
109+
slog.Warn("Missing port in address. Using default port 5432.", slog.String("err", err.Error()))
110+
host = s.Address
111+
port = "5432"
112+
}
113+
114+
if hostCName, err := net.DefaultResolver.LookupCNAME(context.TODO(), host); err != nil {
115+
slog.Warn("Error looking up CNAME for DB host. Using original address.", slog.String("err", err.Error()))
116+
} else {
117+
host = hostCName
118+
}
119+
120+
// Instance endpoint always returns with a trailing '.'
121+
return net.JoinHostPort(strings.TrimSuffix(host, "."), port)
122+
}
123+
54124
type CollectorManifest struct {
55125
Latest string `json:"latest"`
56126
Versions []CollectorVersion `json:"versions"`
@@ -111,39 +181,39 @@ type DefaultAdminConfiguration struct {
111181
}
112182

113183
type Configuration struct {
114-
Version int `json:"version"`
115-
BindAddress string `json:"bind_addr"`
116-
SlowQueryThreshold int64 `json:"slow_query_threshold"`
117-
MaxGraphQueryCacheSize int `json:"max_graphdb_cache_size"`
118-
MaxAPICacheSize int `json:"max_api_cache_size"`
119-
MetricsPort string `json:"metrics_port"`
120-
RootURL serde.URL `json:"root_url"`
121-
WorkDir string `json:"work_dir"`
122-
LogLevel string `json:"log_level"`
123-
LogPath string `json:"log_path"`
124-
TLS TLSConfiguration `json:"tls"`
125-
GraphDriver string `json:"graph_driver"`
126-
Database dawgs.DatabaseConfiguration `json:"database"`
127-
Neo4J dawgs.DatabaseConfiguration `json:"neo4j"`
128-
Crypto CryptoConfiguration `json:"crypto"`
129-
SAML SAMLConfiguration `json:"saml"`
130-
DefaultAdmin DefaultAdminConfiguration `json:"default_admin"`
131-
CollectorsBucketURL serde.URL `json:"collectors_bucket_url"`
132-
CollectorsBasePath string `json:"collectors_base_path"`
133-
DatapipeInterval int `json:"datapipe_interval"`
134-
EnableStartupWaitPeriod bool `json:"enable_startup_wait_period"`
135-
EnableAPILogging bool `json:"enable_api_logging"`
136-
EnableCypherMutations bool `json:"enable_cypher_mutations"`
137-
DisableAnalysis bool `json:"disable_analysis"`
138-
DisableCypherComplexityLimit bool `json:"disable_cypher_complexity_limit"`
139-
DisableIngest bool `json:"disable_ingest"`
140-
DisableMigrations bool `json:"disable_migrations"`
141-
GraphQueryMemoryLimit uint16 `json:"graph_query_memory_limit"`
142-
EnableTextLogger bool `json:"enable_text_logger"`
143-
RecreateDefaultAdmin bool `json:"recreate_default_admin"`
144-
EnableUserAnalytics bool `json:"enable_user_analytics"`
145-
ForceDownloadEmbeddedCollectors bool `json:"force_download_embedded_collectors"`
146-
EnableAuditLogStdout bool `json:"enable_audit_log_stdout"`
184+
Version int `json:"version"`
185+
BindAddress string `json:"bind_addr"`
186+
SlowQueryThreshold int64 `json:"slow_query_threshold"`
187+
MaxGraphQueryCacheSize int `json:"max_graphdb_cache_size"`
188+
MaxAPICacheSize int `json:"max_api_cache_size"`
189+
MetricsPort string `json:"metrics_port"`
190+
RootURL serde.URL `json:"root_url"`
191+
WorkDir string `json:"work_dir"`
192+
LogLevel string `json:"log_level"`
193+
LogPath string `json:"log_path"`
194+
TLS TLSConfiguration `json:"tls"`
195+
GraphDriver string `json:"graph_driver"`
196+
Database DatabaseConfiguration `json:"database"`
197+
Neo4J DatabaseConfiguration `json:"neo4j"`
198+
Crypto CryptoConfiguration `json:"crypto"`
199+
SAML SAMLConfiguration `json:"saml"`
200+
DefaultAdmin DefaultAdminConfiguration `json:"default_admin"`
201+
CollectorsBucketURL serde.URL `json:"collectors_bucket_url"`
202+
CollectorsBasePath string `json:"collectors_base_path"`
203+
DatapipeInterval int `json:"datapipe_interval"`
204+
EnableStartupWaitPeriod bool `json:"enable_startup_wait_period"`
205+
EnableAPILogging bool `json:"enable_api_logging"`
206+
EnableCypherMutations bool `json:"enable_cypher_mutations"`
207+
DisableAnalysis bool `json:"disable_analysis"`
208+
DisableCypherComplexityLimit bool `json:"disable_cypher_complexity_limit"`
209+
DisableIngest bool `json:"disable_ingest"`
210+
DisableMigrations bool `json:"disable_migrations"`
211+
GraphQueryMemoryLimit uint16 `json:"graph_query_memory_limit"`
212+
EnableTextLogger bool `json:"enable_text_logger"`
213+
RecreateDefaultAdmin bool `json:"recreate_default_admin"`
214+
EnableUserAnalytics bool `json:"enable_user_analytics"`
215+
ForceDownloadEmbeddedCollectors bool `json:"force_download_embedded_collectors"`
216+
EnableAuditLogStdout bool `json:"enable_audit_log_stdout"`
147217
}
148218

149219
func (s Configuration) TempDirectory() string {

cmd/api/src/config/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestSetValuesFromEnv(t *testing.T) {
7070
"bhe_database_secret=supersecretpassword",
7171
}))
7272

73-
assert.Equal(t, "neo4j://neo4j:neo4jj@localhost:7070/neo4j", cfg.Neo4J.Neo4jConnectionString())
73+
assert.Equal(t, "neo4j://neo4j:neo4jj@localhost:7070/neo4j", cfg.Neo4J.Neo4JConnectionString())
7474
assert.Equal(t, "postgresql://bhe:supersecretpassword@localhost:5432/bhe", cfg.Database.PostgreSQLConnectionString())
7575
})
7676

0 commit comments

Comments
 (0)