Skip to content

Commit e0d50bd

Browse files
author
abhishek-dubey_sreeram
committed
updating connections in-memory using config-api
1 parent 1c8720e commit e0d50bd

3 files changed

Lines changed: 169 additions & 65 deletions

File tree

fdw.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,6 @@ func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double,
162162
FdwError(sperr.WrapWithMessage(err, "failed to process options"))
163163
}
164164

165-
// reload connection config
166-
// TODO remove need for fdw to load connection config
167-
_, err = pluginHub.LoadConnectionConfig()
168-
if err != nil {
169-
log.Printf("[ERROR] LoadConnectionConfig failed %v ", err)
170-
FdwError(err)
171-
return
172-
}
173-
174165
tableOpts := GetFTableOptions(types.Oid(state.foreigntableid))
175166

176167
// Extract trace context if available

hub/connection_factory.go

Lines changed: 124 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"sync"
88

99
"github.com/turbot/pipe-fittings/v2/utils"
10+
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
11+
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
1012
"github.com/turbot/steampipe/v2/pkg/pluginmanager"
13+
pb "github.com/turbot/steampipe/v2/pkg/pluginmanager_service/grpc/proto"
1114

12-
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
1315
"github.com/turbot/steampipe/v2/pkg/steampipeconfig"
1416
)
1517

@@ -32,11 +34,25 @@ func newConnectionFactory(hub *RemoteHub) *connectionFactory {
3234
// extract the plugin FQN and connection name from a map key
3335
func (f *connectionFactory) parsePluginKey(key string) (pluginFQN, connectionName string) {
3436
split := strings.Split(key, keySeparator)
37+
if len(split) < 2 {
38+
return "", ""
39+
}
3540
pluginFQN = split[0]
3641
connectionName = split[1]
3742
return
3843
}
3944

45+
// NOTE: caller must hold f.connectionLock
46+
func (f *connectionFactory) getByConnectionName(connectionName string) (*steampipeconfig.ConnectionPlugin, bool) {
47+
for key, connectionPlugin := range f.connectionPlugins {
48+
_, existingConnection := f.parsePluginKey(key)
49+
if existingConnection == connectionName {
50+
return connectionPlugin, true
51+
}
52+
}
53+
return nil, false
54+
}
55+
4056
// if a connection plugin for the plugin and connection, return it. If it does not, create it, store in map and return it
4157
// NOTE: there is special case logic got aggregate connections
4258
func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
@@ -45,22 +61,31 @@ func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipecon
4561
f.connectionLock.Lock()
4662
defer f.connectionLock.Unlock()
4763

48-
// build a map key for the plugin
49-
key := f.connectionPluginKey(pluginFQN, connectionName)
64+
var key string
65+
if pluginFQN != "" {
66+
// build a map key for the plugin
67+
key = f.connectionPluginKey(pluginFQN, connectionName)
68+
c, gotConnectionPlugin := f.connectionPlugins[key]
69+
if gotConnectionPlugin && !c.PluginClient.Exited() {
70+
return c, nil
71+
}
5072

51-
c, gotConnectionPlugin := f.connectionPlugins[key]
52-
if gotConnectionPlugin && !c.PluginClient.Exited() {
73+
if gotConnectionPlugin {
74+
log.Printf("[TRACE] connectionPlugin with key %s has exited - reloading", key)
75+
}
76+
}
77+
78+
// If plugin FQN is unknown (or key lookup missed), fall back to a connection-name-only lookup.
79+
if c, gotConnectionPlugin := f.getByConnectionName(connectionName); gotConnectionPlugin && !c.PluginClient.Exited() {
5380
return c, nil
5481
}
5582

5683
// so either we have not yet instantiated the connection plugin, or it has exited
57-
if !gotConnectionPlugin {
84+
if key != "" {
5885
log.Printf("[TRACE] no connectionPlugin loaded with key %s (len %d)", key, len(f.connectionPlugins))
59-
for k := range f.connectionPlugins {
60-
log.Printf("[TRACE] key: %s", k)
61-
}
62-
} else {
63-
log.Printf("[TRACE] connectionPluginwith key %s has exited - reloading", key)
86+
}
87+
for k := range f.connectionPlugins {
88+
log.Printf("[TRACE] key: %s", k)
6489
}
6590

6691
log.Printf("[TRACE] failed to get plugin: %s connection %s", pluginFQN, connectionName)
@@ -90,46 +115,107 @@ func (f *connectionFactory) createConnectionPlugin(pluginFQN string, connectionN
90115
defer f.connectionLock.Unlock()
91116
log.Printf("[TRACE] connectionFactory.createConnectionPlugin create connection %s", connectionName)
92117

93-
// load the config for this connection
94-
connection, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
95-
if !ok {
96-
log.Printf("[INFO] connectionFactory.createConnectionPlugin create connection %s - no config found so reloading config!", connectionName)
118+
// Prefer in-memory GlobalConfig when available (backward compatible for
119+
// connections loaded at startup), but do not read from disk on misses.
120+
if steampipeconfig.GlobalConfig != nil && steampipeconfig.GlobalConfig.Connections != nil {
121+
if connection, ok := steampipeconfig.GlobalConfig.Connections[connectionName]; ok {
122+
log.Printf("[TRACE] createConnectionPlugin plugin %s, connection %s, config: %s\n", utils.PluginFQNToSchemaName(pluginFQN), connectionName, connection.Config)
97123

98-
// ask hub to reload config - it's possible we are being asked to import a newly added connection
99-
// TODO remove need for hub to load config at all
100-
if _, err := f.hub.LoadConnectionConfig(); err != nil {
101-
log.Printf("[ERROR] LoadConnectionConfig failed %v ", err)
102-
return nil, err
103-
}
104-
// now try to get config again
105-
connection, ok = steampipeconfig.GlobalConfig.Connections[connectionName]
106-
if !ok {
107-
log.Printf("[WARN] no config found for connection %s", connectionName)
108-
return nil, fmt.Errorf("no config found for connection %s", connectionName)
124+
// get plugin manager
125+
pluginManager, err := pluginmanager.GetPluginManager()
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
connectionPlugins, res := steampipeconfig.CreateConnectionPlugins(pluginManager, []string{connection.Name})
131+
if res.Error != nil {
132+
return nil, res.Error
133+
}
134+
if connectionPlugins[connection.Name] == nil {
135+
if len(res.Warnings) > 0 {
136+
return nil, fmt.Errorf("%s", strings.Join(res.Warnings, ","))
137+
}
138+
return nil, fmt.Errorf("CreateConnectionPlugins did not return error but '%s' not found in connection map", connection.Name)
139+
}
140+
141+
connectionPlugin := connectionPlugins[connection.Name]
142+
f.add(connectionPlugin, connectionName)
143+
144+
return connectionPlugin, nil
109145
}
110146
}
111147

112-
log.Printf("[TRACE] createConnectionPlugin plugin %s, connection %s, config: %s\n", utils.PluginFQNToSchemaName(pluginFQN), connectionName, connection.Config)
148+
// No disk config available for this connection. Resolve entirely via
149+
// plugin manager in-memory state.
150+
connectionPlugin, err := f.createConnectionPluginFromPluginManager(connectionName)
151+
if err != nil {
152+
return nil, err
153+
}
154+
f.add(connectionPlugin, connectionName)
155+
return connectionPlugin, nil
156+
}
113157

114-
// get plugin manager
158+
// NOTE: caller must hold f.connectionLock
159+
func (f *connectionFactory) createConnectionPluginFromPluginManager(connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
115160
pluginManager, err := pluginmanager.GetPluginManager()
116161
if err != nil {
117162
return nil, err
118163
}
119164

120-
connectionPlugins, res := steampipeconfig.CreateConnectionPlugins(pluginManager, []string{connection.Name})
121-
if res.Error != nil {
122-
return nil, res.Error
165+
getResponse, err := pluginManager.Get(&pb.GetRequest{Connections: []string{connectionName}})
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
if failure, ok := getResponse.FailureMap[connectionName]; ok {
171+
return nil, fmt.Errorf("failed to load connection '%s': %s", connectionName, failure)
123172
}
124-
if connectionPlugins[connection.Name] == nil {
125-
if len(res.Warnings) > 0 {
126-
return nil, fmt.Errorf("%s", strings.Join(res.Warnings, ","))
173+
reattach, ok := getResponse.ReattachMap[connectionName]
174+
if !ok {
175+
return nil, fmt.Errorf("no reattach config returned for connection '%s'", connectionName)
176+
}
177+
if reattach.Pid == 0 {
178+
return nil, fmt.Errorf("reattach config has zero pid for connection '%s'", connectionName)
179+
}
180+
pluginInstance := reattach.Plugin
181+
if pluginInstance == "" {
182+
return nil, fmt.Errorf("reattach config missing plugin for connection '%s'", connectionName)
183+
}
184+
185+
pluginClient, err := sdkgrpc.NewPluginClientFromReattach(reattach.Convert(), pluginInstance)
186+
if err != nil {
187+
return nil, err
188+
}
189+
190+
connectionPlugin := steampipeconfig.NewConnectionPlugin(
191+
utils.PluginFQNToSchemaName(pluginInstance),
192+
pluginInstance,
193+
pluginClient,
194+
reattach.SupportedOperations,
195+
)
196+
197+
for _, c := range reattach.Connections {
198+
connectionPlugin.ConnectionMap[c] = &steampipeconfig.ConnectionPluginData{
199+
Name: c,
200+
Config: "",
201+
Type: "",
202+
Schema: &sdkproto.Schema{},
203+
}
204+
}
205+
if _, ok := connectionPlugin.ConnectionMap[connectionName]; !ok {
206+
connectionPlugin.ConnectionMap[connectionName] = &steampipeconfig.ConnectionPluginData{
207+
Name: connectionName,
208+
Config: "",
209+
Type: "",
210+
Schema: &sdkproto.Schema{},
127211
}
128-
return nil, fmt.Errorf("CreateConnectionPlugins did not return error but '%s' not found in connection map", connection.Name)
129212
}
130213

131-
connectionPlugin := connectionPlugins[connection.Name]
132-
f.add(connectionPlugin, connectionName)
214+
schema, err := pluginClient.GetSchema(connectionName)
215+
if err != nil {
216+
return nil, err
217+
}
218+
connectionPlugin.ConnectionMap[connectionName].Schema = schema
133219

134220
return connectionPlugin, nil
135221
}
@@ -149,7 +235,7 @@ func (f *connectionFactory) add(connectionPlugin *steampipeconfig.ConnectionPlug
149235
}
150236
}
151237

152-
func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*proto.Schema, error) {
238+
func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*sdkproto.Schema, error) {
153239
log.Printf("[TRACE] connectionFactory getSchema %s %s", pluginFQN, connectionName)
154240
// do we have this connection already loaded
155241
c, err := f.get(pluginFQN, connectionName)

hub/hub_remote.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package hub
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"log"
87
"os"
98
"path"
@@ -56,6 +55,7 @@ func newRemoteHub() (*RemoteHub, error) {
5655
}
5756
app_specific.InstallDir = steampipeDir
5857

58+
// Load config metadata at startup for backward-compatible aggregator behavior.
5959
log.Printf("[INFO] newRemoteHub RemoteHub.LoadConnectionConfig ")
6060
if _, err := hub.LoadConnectionConfig(); err != nil {
6161
return nil, err
@@ -175,19 +175,17 @@ func (h *RemoteHub) startScanForConnection(connectionName string, table string,
175175
// ok so this is a multi connection plugin, build list of connections,
176176
// if this connection is NOT an aggregator, only execute for the named connection
177177

178-
// get connection config
179-
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
180-
if !ok {
181-
return nil, fmt.Errorf("no connection config loaded for connection '%s'", connectionName)
182-
}
183-
184178
var connectionNames = []string{connectionName}
185-
if connectionConfig.Type == modconfig.ConnectionTypeAggregator {
186-
connectionNames = connectionConfig.GetResolveConnectionNames()
187-
// if there are no connections, do not proceed
188-
if len(connectionNames) == 0 {
189-
return nil, errors.New(connectionConfig.GetEmptyAggregatorError())
179+
if connectionConfig, ok := h.loadConnectionConfigWithFallback(connectionName, true, true); ok {
180+
if connectionConfig.Type == modconfig.ConnectionTypeAggregator {
181+
connectionNames = connectionConfig.GetResolveConnectionNames()
182+
// if there are no connections, do not proceed
183+
if len(connectionNames) == 0 {
184+
return nil, errors.New(connectionConfig.GetEmptyAggregatorError())
185+
}
190186
}
187+
} else {
188+
log.Printf("[TRACE] no GlobalConfig entry for connection '%s' - using single-connection execution path", connectionName)
191189
}
192190

193191
// for each connection, determine whether to pushdown the limit
@@ -249,13 +247,13 @@ func (h *RemoteHub) buildConnectionLimitMap(table string, qualMap map[string]*pr
249247
func (h *RemoteHub) getConnectionPlugin(connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
250248
log.Printf("[TRACE] hub.getConnectionPlugin for connection '%s`", connectionName)
251249

252-
// get the plugin FQN
253-
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
254-
if !ok {
255-
log.Printf("[WARN] no connection config loaded for connection '%s'", connectionName)
256-
return nil, fmt.Errorf("no connection config loaded for connection '%s'", connectionName)
250+
pluginFQN := ""
251+
if connectionConfig, ok := h.loadConnectionConfigWithFallback(connectionName, true, false); ok {
252+
pluginFQN = connectionConfig.Plugin
253+
}
254+
if pluginFQN == "" {
255+
log.Printf("[TRACE] no plugin FQN in GlobalConfig for connection '%s' - resolving via plugin manager", connectionName)
257256
}
258-
pluginFQN := connectionConfig.Plugin
259257

260258
// ask connection map to get or create this connection
261259
c, err := h.connections.getOrCreate(pluginFQN, connectionName)
@@ -267,6 +265,35 @@ func (h *RemoteHub) getConnectionPlugin(connectionName string) (*steampipeconfig
267265
return c, nil
268266
}
269267

268+
func (h *RemoteHub) loadConnectionConfigWithFallback(connectionName string, refreshOnMiss bool, refreshIfAggregator bool) (*modconfig.SteampipeConnection, bool) {
269+
getCurrent := func() (*modconfig.SteampipeConnection, bool) {
270+
if steampipeconfig.GlobalConfig == nil || steampipeconfig.GlobalConfig.Connections == nil {
271+
return nil, false
272+
}
273+
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
274+
return connectionConfig, ok
275+
}
276+
277+
connectionConfig, ok := getCurrent()
278+
if !ok && refreshOnMiss {
279+
log.Printf("[TRACE] loadConnectionConfigWithFallback: missing '%s' in GlobalConfig - reloading config", connectionName)
280+
if _, err := h.LoadConnectionConfig(); err != nil {
281+
log.Printf("[WARN] loadConnectionConfigWithFallback reload failed for '%s': %s", connectionName, err)
282+
}
283+
connectionConfig, ok = getCurrent()
284+
}
285+
286+
if ok && refreshIfAggregator && connectionConfig.Type == modconfig.ConnectionTypeAggregator {
287+
log.Printf("[TRACE] loadConnectionConfigWithFallback: refreshing aggregator metadata for '%s'", connectionName)
288+
if _, err := h.LoadConnectionConfig(); err != nil {
289+
log.Printf("[WARN] loadConnectionConfigWithFallback aggregator reload failed for '%s': %s", connectionName, err)
290+
}
291+
connectionConfig, ok = getCurrent()
292+
}
293+
294+
return connectionConfig, ok
295+
}
296+
270297
func (h *RemoteHub) clearConnectionCache(connection string) error {
271298
log.Printf("[INFO] clear connection cache for connection '%s'", connection)
272299
connectionPlugin, err := h.getConnectionPlugin(connection)

0 commit comments

Comments
 (0)