Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions fdw.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,6 @@ func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double,
FdwError(sperr.WrapWithMessage(err, "failed to process options"))
}

// reload connection config
// TODO remove need for fdw to load connection config
_, err = pluginHub.LoadConnectionConfig()
if err != nil {
log.Printf("[ERROR] LoadConnectionConfig failed %v ", err)
FdwError(err)
return
}

tableOpts := GetFTableOptions(types.Oid(state.foreigntableid))

// Extract trace context if available
Expand Down
162 changes: 124 additions & 38 deletions hub/connection_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"sync"

"github.com/turbot/pipe-fittings/v2/utils"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe/v2/pkg/pluginmanager"
pb "github.com/turbot/steampipe/v2/pkg/pluginmanager_service/grpc/proto"

"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe/v2/pkg/steampipeconfig"
)

Expand All @@ -32,11 +34,25 @@ func newConnectionFactory(hub *RemoteHub) *connectionFactory {
// extract the plugin FQN and connection name from a map key
func (f *connectionFactory) parsePluginKey(key string) (pluginFQN, connectionName string) {
split := strings.Split(key, keySeparator)
if len(split) < 2 {
return "", ""
}
pluginFQN = split[0]
connectionName = split[1]
return
}

// NOTE: caller must hold f.connectionLock
func (f *connectionFactory) getByConnectionName(connectionName string) (*steampipeconfig.ConnectionPlugin, bool) {
for key, connectionPlugin := range f.connectionPlugins {
_, existingConnection := f.parsePluginKey(key)
if existingConnection == connectionName {
return connectionPlugin, true
}
}
return nil, false
}

// if a connection plugin for the plugin and connection, return it. If it does not, create it, store in map and return it
// NOTE: there is special case logic got aggregate connections
func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
Expand All @@ -45,22 +61,31 @@ func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipecon
f.connectionLock.Lock()
defer f.connectionLock.Unlock()

// build a map key for the plugin
key := f.connectionPluginKey(pluginFQN, connectionName)
var key string
if pluginFQN != "" {
// build a map key for the plugin
key = f.connectionPluginKey(pluginFQN, connectionName)
c, gotConnectionPlugin := f.connectionPlugins[key]
if gotConnectionPlugin && !c.PluginClient.Exited() {
return c, nil
}

c, gotConnectionPlugin := f.connectionPlugins[key]
if gotConnectionPlugin && !c.PluginClient.Exited() {
if gotConnectionPlugin {
log.Printf("[TRACE] connectionPlugin with key %s has exited - reloading", key)
}
}

// If plugin FQN is unknown (or key lookup missed), fall back to a connection-name-only lookup.
if c, gotConnectionPlugin := f.getByConnectionName(connectionName); gotConnectionPlugin && !c.PluginClient.Exited() {
return c, nil
}

// so either we have not yet instantiated the connection plugin, or it has exited
if !gotConnectionPlugin {
if key != "" {
log.Printf("[TRACE] no connectionPlugin loaded with key %s (len %d)", key, len(f.connectionPlugins))
for k := range f.connectionPlugins {
log.Printf("[TRACE] key: %s", k)
}
} else {
log.Printf("[TRACE] connectionPluginwith key %s has exited - reloading", key)
}
for k := range f.connectionPlugins {
log.Printf("[TRACE] key: %s", k)
}

log.Printf("[TRACE] failed to get plugin: %s connection %s", pluginFQN, connectionName)
Expand Down Expand Up @@ -90,46 +115,107 @@ func (f *connectionFactory) createConnectionPlugin(pluginFQN string, connectionN
defer f.connectionLock.Unlock()
log.Printf("[TRACE] connectionFactory.createConnectionPlugin create connection %s", connectionName)

// load the config for this connection
connection, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
if !ok {
log.Printf("[INFO] connectionFactory.createConnectionPlugin create connection %s - no config found so reloading config!", connectionName)
// Prefer in-memory GlobalConfig when available (backward compatible for
// connections loaded at startup), but do not read from disk on misses.
if steampipeconfig.GlobalConfig != nil && steampipeconfig.GlobalConfig.Connections != nil {
if connection, ok := steampipeconfig.GlobalConfig.Connections[connectionName]; ok {
log.Printf("[TRACE] createConnectionPlugin plugin %s, connection %s, config: %s\n", utils.PluginFQNToSchemaName(pluginFQN), connectionName, connection.Config)

// ask hub to reload config - it's possible we are being asked to import a newly added connection
// TODO remove need for hub to load config at all
if _, err := f.hub.LoadConnectionConfig(); err != nil {
log.Printf("[ERROR] LoadConnectionConfig failed %v ", err)
return nil, err
}
// now try to get config again
connection, ok = steampipeconfig.GlobalConfig.Connections[connectionName]
if !ok {
log.Printf("[WARN] no config found for connection %s", connectionName)
return nil, fmt.Errorf("no config found for connection %s", connectionName)
// get plugin manager
pluginManager, err := pluginmanager.GetPluginManager()
if err != nil {
return nil, err
}

connectionPlugins, res := steampipeconfig.CreateConnectionPlugins(pluginManager, []string{connection.Name})
if res.Error != nil {
return nil, res.Error
}
if connectionPlugins[connection.Name] == nil {
if len(res.Warnings) > 0 {
return nil, fmt.Errorf("%s", strings.Join(res.Warnings, ","))
}
return nil, fmt.Errorf("CreateConnectionPlugins did not return error but '%s' not found in connection map", connection.Name)
}

connectionPlugin := connectionPlugins[connection.Name]
f.add(connectionPlugin, connectionName)

return connectionPlugin, nil
}
}

log.Printf("[TRACE] createConnectionPlugin plugin %s, connection %s, config: %s\n", utils.PluginFQNToSchemaName(pluginFQN), connectionName, connection.Config)
// No disk config available for this connection. Resolve entirely via
// plugin manager in-memory state.
connectionPlugin, err := f.createConnectionPluginFromPluginManager(connectionName)
if err != nil {
return nil, err
}
f.add(connectionPlugin, connectionName)
return connectionPlugin, nil
}

// get plugin manager
// NOTE: caller must hold f.connectionLock
func (f *connectionFactory) createConnectionPluginFromPluginManager(connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
pluginManager, err := pluginmanager.GetPluginManager()
if err != nil {
return nil, err
}

connectionPlugins, res := steampipeconfig.CreateConnectionPlugins(pluginManager, []string{connection.Name})
if res.Error != nil {
return nil, res.Error
getResponse, err := pluginManager.Get(&pb.GetRequest{Connections: []string{connectionName}})
if err != nil {
return nil, err
}

if failure, ok := getResponse.FailureMap[connectionName]; ok {
return nil, fmt.Errorf("failed to load connection '%s': %s", connectionName, failure)
}
if connectionPlugins[connection.Name] == nil {
if len(res.Warnings) > 0 {
return nil, fmt.Errorf("%s", strings.Join(res.Warnings, ","))
reattach, ok := getResponse.ReattachMap[connectionName]
if !ok {
return nil, fmt.Errorf("no reattach config returned for connection '%s'", connectionName)
}
if reattach.Pid == 0 {
return nil, fmt.Errorf("reattach config has zero pid for connection '%s'", connectionName)
}
pluginInstance := reattach.Plugin
if pluginInstance == "" {
return nil, fmt.Errorf("reattach config missing plugin for connection '%s'", connectionName)
}

pluginClient, err := sdkgrpc.NewPluginClientFromReattach(reattach.Convert(), pluginInstance)
if err != nil {
return nil, err
}

connectionPlugin := steampipeconfig.NewConnectionPlugin(
utils.PluginFQNToSchemaName(pluginInstance),
pluginInstance,
pluginClient,
reattach.SupportedOperations,
)

for _, c := range reattach.Connections {
connectionPlugin.ConnectionMap[c] = &steampipeconfig.ConnectionPluginData{
Name: c,
Config: "",
Type: "",
Schema: &sdkproto.Schema{},
}
}
if _, ok := connectionPlugin.ConnectionMap[connectionName]; !ok {
connectionPlugin.ConnectionMap[connectionName] = &steampipeconfig.ConnectionPluginData{
Name: connectionName,
Config: "",
Type: "",
Schema: &sdkproto.Schema{},
}
return nil, fmt.Errorf("CreateConnectionPlugins did not return error but '%s' not found in connection map", connection.Name)
}

connectionPlugin := connectionPlugins[connection.Name]
f.add(connectionPlugin, connectionName)
schema, err := pluginClient.GetSchema(connectionName)
if err != nil {
return nil, err
}
connectionPlugin.ConnectionMap[connectionName].Schema = schema

return connectionPlugin, nil
}
Expand All @@ -149,7 +235,7 @@ func (f *connectionFactory) add(connectionPlugin *steampipeconfig.ConnectionPlug
}
}

func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*proto.Schema, error) {
func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*sdkproto.Schema, error) {
log.Printf("[TRACE] connectionFactory getSchema %s %s", pluginFQN, connectionName)
// do we have this connection already loaded
c, err := f.get(pluginFQN, connectionName)
Expand Down
63 changes: 45 additions & 18 deletions hub/hub_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package hub
import (
"context"
"errors"
"fmt"
"log"
"os"
"path"
Expand Down Expand Up @@ -56,6 +55,7 @@ func newRemoteHub() (*RemoteHub, error) {
}
app_specific.InstallDir = steampipeDir

// Load config metadata at startup for backward-compatible aggregator behavior.
log.Printf("[INFO] newRemoteHub RemoteHub.LoadConnectionConfig ")
if _, err := hub.LoadConnectionConfig(); err != nil {
return nil, err
Expand Down Expand Up @@ -175,19 +175,17 @@ func (h *RemoteHub) startScanForConnection(connectionName string, table string,
// ok so this is a multi connection plugin, build list of connections,
// if this connection is NOT an aggregator, only execute for the named connection

// get connection config
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
if !ok {
return nil, fmt.Errorf("no connection config loaded for connection '%s'", connectionName)
}

var connectionNames = []string{connectionName}
if connectionConfig.Type == modconfig.ConnectionTypeAggregator {
connectionNames = connectionConfig.GetResolveConnectionNames()
// if there are no connections, do not proceed
if len(connectionNames) == 0 {
return nil, errors.New(connectionConfig.GetEmptyAggregatorError())
if connectionConfig, ok := h.loadConnectionConfigWithFallback(connectionName, true, true); ok {
if connectionConfig.Type == modconfig.ConnectionTypeAggregator {
connectionNames = connectionConfig.GetResolveConnectionNames()
// if there are no connections, do not proceed
if len(connectionNames) == 0 {
return nil, errors.New(connectionConfig.GetEmptyAggregatorError())
}
}
} else {
log.Printf("[TRACE] no GlobalConfig entry for connection '%s' - using single-connection execution path", connectionName)
}

// for each connection, determine whether to pushdown the limit
Expand Down Expand Up @@ -249,13 +247,13 @@ func (h *RemoteHub) buildConnectionLimitMap(table string, qualMap map[string]*pr
func (h *RemoteHub) getConnectionPlugin(connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
log.Printf("[TRACE] hub.getConnectionPlugin for connection '%s`", connectionName)

// get the plugin FQN
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
if !ok {
log.Printf("[WARN] no connection config loaded for connection '%s'", connectionName)
return nil, fmt.Errorf("no connection config loaded for connection '%s'", connectionName)
pluginFQN := ""
if connectionConfig, ok := h.loadConnectionConfigWithFallback(connectionName, true, false); ok {
pluginFQN = connectionConfig.Plugin
}
if pluginFQN == "" {
log.Printf("[TRACE] no plugin FQN in GlobalConfig for connection '%s' - resolving via plugin manager", connectionName)
}
pluginFQN := connectionConfig.Plugin

// ask connection map to get or create this connection
c, err := h.connections.getOrCreate(pluginFQN, connectionName)
Expand All @@ -267,6 +265,35 @@ func (h *RemoteHub) getConnectionPlugin(connectionName string) (*steampipeconfig
return c, nil
}

func (h *RemoteHub) loadConnectionConfigWithFallback(connectionName string, refreshOnMiss bool, refreshIfAggregator bool) (*modconfig.SteampipeConnection, bool) {
getCurrent := func() (*modconfig.SteampipeConnection, bool) {
if steampipeconfig.GlobalConfig == nil || steampipeconfig.GlobalConfig.Connections == nil {
return nil, false
}
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
return connectionConfig, ok
}

connectionConfig, ok := getCurrent()
if !ok && refreshOnMiss {
log.Printf("[TRACE] loadConnectionConfigWithFallback: missing '%s' in GlobalConfig - reloading config", connectionName)
if _, err := h.LoadConnectionConfig(); err != nil {
log.Printf("[WARN] loadConnectionConfigWithFallback reload failed for '%s': %s", connectionName, err)
}
connectionConfig, ok = getCurrent()
}

if ok && refreshIfAggregator && connectionConfig.Type == modconfig.ConnectionTypeAggregator {
log.Printf("[TRACE] loadConnectionConfigWithFallback: refreshing aggregator metadata for '%s'", connectionName)
if _, err := h.LoadConnectionConfig(); err != nil {
log.Printf("[WARN] loadConnectionConfigWithFallback aggregator reload failed for '%s': %s", connectionName, err)
}
connectionConfig, ok = getCurrent()
}

return connectionConfig, ok
}

func (h *RemoteHub) clearConnectionCache(connection string) error {
log.Printf("[INFO] clear connection cache for connection '%s'", connection)
connectionPlugin, err := h.getConnectionPlugin(connection)
Expand Down