diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index ad6147ca3a4..b3ce3357ca1 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -409,6 +409,12 @@ fallback](https://github.com/lightningnetwork/lnd/pull/10717) so that gossip channel filtering and zombie edge lookups use the correct gossip version instead of hardcoding v1. +* Make the [graph `Store` interface + cross-version](https://github.com/lightningnetwork/lnd/pull/10714) so that + `ForEachNode`, `ForEachChannel`, and `ForEachNodeDirectedChannel` work across + gossip v1 and v2. Add `Preferred` fetch helpers and `GetVersions` queries + so callers can retrieve channels without knowing which gossip version + announced them. * Updated waiting proof persistence for gossip upgrades by introducing typed waiting proof keys and payloads, with a DB migration to rewrite legacy waiting proof records to the new key/value format diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index d0764446f7f..2076ccd1ba5 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -348,8 +348,7 @@ func TestPopulateDBs(t *testing.T) { // graph. countNodes := func(graph *ChannelGraph) int { numNodes := 0 - v1Graph := NewVersionedGraph(graph, lnwire.GossipVersion1) - err := v1Graph.ForEachNode( + err := graph.ForEachNode( ctx, func(node *models.Node) error { numNodes++ @@ -372,7 +371,7 @@ func TestPopulateDBs(t *testing.T) { numPolicies = 0 ) err := graph.ForEachChannel( - ctx, lnwire.GossipVersion1, + ctx, func(info *models.ChannelEdgeInfo, policy, policy2 *models.ChannelEdgePolicy) error { @@ -500,7 +499,7 @@ func syncGraph(t *testing.T, src, dest *ChannelGraph) { } var wgChans sync.WaitGroup - err = src.ForEachChannel(ctx, lnwire.GossipVersion1, + err = src.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, policy1, policy2 *models.ChannelEdgePolicy) error { @@ -624,7 +623,7 @@ func BenchmarkGraphReadMethods(b *testing.B) { name: "ForEachNode", fn: func(b testing.TB, store Store) { err := store.ForEachNode( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.Node) error { // Increment the counter to // ensure the callback is doing @@ -640,12 +639,11 @@ func BenchmarkGraphReadMethods(b *testing.B) { { name: "ForEachChannel", fn: func(b testing.TB, store Store) { - //nolint:ll - err := store.ForEachChannel( - ctx, lnwire.GossipVersion1, + err := store.ForEachChannel(ctx, func(_ *models.ChannelEdgeInfo, + _, _ *models.ChannelEdgePolicy, - _ *models.ChannelEdgePolicy) error { + ) error { // Increment the counter to // ensure the callback is doing @@ -996,7 +994,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) { ) err := store.ForEachNode( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.Node) error { numNodes++ @@ -1007,7 +1005,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) { //nolint:ll err = store.ForEachChannel( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.ChannelEdgeInfo, _, _ *models.ChannelEdgePolicy) error { diff --git a/graph/db/graph.go b/graph/db/graph.go index 9e72cad2a33..a19487b75f5 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -238,12 +238,21 @@ func (c *ChannelGraph) populateCache(ctx context.Context) error { for _, v := range []lnwire.GossipVersion{ gossipV1, gossipV2, } { - // TODO(elle): If we have both v1 and v2 entries for the same - // node/channel, prefer v2 when merging. + // We iterate v1 first, then v2. AddNodeFeatures and AddChannel + // overwrite on key collision, so v2 data takes precedence when + // both versions exist. For features specifically we + // additionally skip empty v2 entries so they don't shadow a + // non-empty v1 feature set; this matches the no-cache + // FetchNodeFeatures fallback rule that a non-empty + // lower-version vector wins over an empty higher-version one. err := c.db.ForEachNodeCacheable(ctx, v, func(node route.Vertex, features *lnwire.FeatureVector) error { + if v == gossipV2 && features.IsEmpty() { + return nil + } + cache.AddNodeFeatures(node, features) return nil @@ -299,9 +308,8 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(ctx context.Context, return c.cache.graphCache.ForEachChannel(node, cb) } - // TODO(elle): once the no-cache path needs to support - // pathfinding across gossip versions, this should iterate - // across all versions rather than defaulting to v1. + // The no-cache path only runs against the KV backend, which is + // v1-only. return c.db.ForEachNodeDirectedChannel( ctx, gossipV1, node, cb, reset, ) @@ -320,7 +328,9 @@ func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context, return c.cache.graphCache.GetFeatures(node), nil } - return c.db.FetchNodeFeatures(ctx, lnwire.GossipVersion1, node) + // The no-cache path only runs against the KV backend, which is + // v1-only. + return c.db.FetchNodeFeatures(ctx, gossipV1, node) } // GraphSession will provide the call-back with access to a NodeTraverser @@ -679,13 +689,22 @@ func (c *ChannelGraph) HasV1Node(ctx context.Context, return c.db.HasV1Node(ctx, nodePub) } -// ForEachChannel iterates through all channel edges stored within the graph. +// ForEachChannel iterates through all channel edges stored within the graph +// across all gossip versions. func (c *ChannelGraph) ForEachChannel(ctx context.Context, - v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error, reset func()) error { - return c.db.ForEachChannel(ctx, v, cb, reset) + return c.db.ForEachChannel(ctx, cb, reset) +} + +// ForEachNode iterates through all stored vertices/nodes in the graph across +// all gossip versions. +func (c *ChannelGraph) ForEachNode(ctx context.Context, + cb func(*models.Node) error, reset func()) error { + + return c.db.ForEachNode(ctx, cb, reset) } // DisabledChannelIDs returns the channel ids of disabled channels. @@ -817,26 +836,39 @@ func (c *ChannelGraph) FetchChanInfos(ctx context.Context, } // FetchChannelEdgesByOutpoint attempts to lookup directed edges by funding -// outpoint. +// outpoint, returning the highest available gossip version. func (c *ChannelGraph) FetchChannelEdgesByOutpoint(ctx context.Context, op *wire.OutPoint) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { - return c.db.FetchChannelEdgesByOutpoint( - ctx, lnwire.GossipVersion1, op, - ) + return c.db.FetchChannelEdgesByOutpointPreferred(ctx, op) } -// FetchChannelEdgesByID attempts to lookup directed edges by channel ID. +// FetchChannelEdgesByID attempts to lookup directed edges by channel ID, +// returning the highest available gossip version. func (c *ChannelGraph) FetchChannelEdgesByID(ctx context.Context, chanID uint64) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { - return c.db.FetchChannelEdgesByID( - ctx, lnwire.GossipVersion1, chanID, - ) + return c.db.FetchChannelEdgesByIDPreferred(ctx, chanID) +} + +// GetVersionsBySCID returns the list of gossip versions for which a channel +// with the given SCID exists in the database. +func (c *ChannelGraph) GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) { + + return c.db.GetVersionsBySCID(ctx, chanID) +} + +// GetVersionsByOutpoint returns the list of gossip versions for which a channel +// with the given funding outpoint exists in the database. +func (c *ChannelGraph) GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) { + + return c.db.GetVersionsByOutpoint(ctx, op) } // PutClosedScid stores a SCID for a closed channel in the database. @@ -927,13 +959,6 @@ func (c *VersionedGraph) ForEachNodeCached(ctx context.Context, return c.ChannelGraph.ForEachNodeCached(ctx, c.v, withAddrs, cb, reset) } -// ForEachNode iterates through all stored vertices/nodes in the graph. -func (c *VersionedGraph) ForEachNode(ctx context.Context, - cb func(*models.Node) error, reset func()) error { - - return c.db.ForEachNode(ctx, c.v, cb, reset) -} - // NumZombies returns the current number of zombie channels in the graph. func (c *VersionedGraph) NumZombies(ctx context.Context) (uint64, error) { return c.db.NumZombies(ctx, c.v) @@ -969,7 +994,7 @@ func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint, // for performing queries against the channel graph. If the graph cache is // enabled, the callback receives the VersionedGraph directly (which implements // NodeTraverser using the cache). Otherwise a read-only database session is -// used. +// used; the no-cache path only runs against the KV backend, which is v1-only. func (c *VersionedGraph) GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error { @@ -977,9 +1002,6 @@ func (c *VersionedGraph) GraphSession(ctx context.Context, return cb(c) } - // TODO(elle): the underlying GraphSession currently creates a - // NodeTraverser that is hardcoded to GossipVersion1. This needs to be - // updated to pass the version through for v2 support. return c.db.GraphSession(ctx, cb, reset) } @@ -1107,14 +1129,6 @@ func (c *VersionedGraph) ForEachNodeChannel(ctx context.Context, return c.db.ForEachNodeChannel(ctx, c.v, nodePub, cb, reset) } -// ForEachChannel iterates through all channel edges stored within the graph. -func (c *VersionedGraph) ForEachChannel(ctx context.Context, - cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { - - return c.db.ForEachChannel(ctx, c.v, cb, reset) -} - // ForEachNodeCacheable iterates through all stored vertices/nodes in the graph. func (c *VersionedGraph) ForEachNodeCacheable(ctx context.Context, cb func(route.Vertex, *lnwire.FeatureVector) error, diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 6521a849e51..900c7211a7d 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -1824,7 +1824,7 @@ func TestGraphTraversal(t *testing.T) { // Iterate through all the known channels within the graph DB, once // again if the map is empty that indicates that all edges have // properly been reached. - err = graph.ForEachChannel(ctx, lnwire.GossipVersion1, + err = graph.ForEachChannel(ctx, func(ei *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy, _ *models.ChannelEdgePolicy) error { @@ -2179,7 +2179,7 @@ func assertPruneTip(t *testing.T, graph *ChannelGraph, func assertNumChans(t *testing.T, graph *ChannelGraph, n int) { numChans := 0 err := graph.ForEachChannel( - t.Context(), lnwire.GossipVersion1, + t.Context(), func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error { @@ -2196,8 +2196,7 @@ func assertNumChans(t *testing.T, graph *ChannelGraph, n int) { func assertNumNodes(t *testing.T, graph *ChannelGraph, n int) { numNodes := 0 - v1Graph := NewVersionedGraph(graph, lnwire.GossipVersion1) - err := v1Graph.ForEachNode(t.Context(), func(_ *models.Node) error { + err := graph.ForEachNode(t.Context(), func(_ *models.Node) error { numNodes++ return nil @@ -6794,3 +6793,590 @@ func TestUpdateRangeValidateForVersion(t *testing.T) { }) } } + +// TestPreferredChannelAndGetVersions tests the four new Store methods: +// FetchChannelEdgesByIDPreferred, FetchChannelEdgesByOutpointPreferred, +// GetVersionsBySCID, and GetVersionsByOutpoint. +func TestPreferredChannelAndGetVersions(t *testing.T) { + t.Parallel() + ctx := t.Context() + + graph := MakeTestGraph(t) + store := graph.db + + node1Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + node2Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + node1V1 := createNode(t, lnwire.GossipVersion1, node1Priv) + node2V1 := createNode(t, lnwire.GossipVersion1, node2Priv) + + require.NoError(t, graph.AddNode(ctx, node1V1)) + require.NoError(t, graph.AddNode(ctx, node2V1)) + + // Create and add a v1 channel edge. + edgeInfo, scid := createEdge( + lnwire.GossipVersion1, 100, 1, 0, 1, node1V1, node2V1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) + + chanID := scid.ToUint64() + op := edgeInfo.ChannelPoint + + // FetchChannelEdgesByIDPreferred should return the v1 channel. + info, _, _, err := store.FetchChannelEdgesByIDPreferred( + ctx, chanID, + ) + require.NoError(t, err) + require.Equal(t, chanID, info.ChannelID) + + // FetchChannelEdgesByOutpointPreferred should also return it. + info, _, _, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &op, + ) + require.NoError(t, err) + require.Equal(t, chanID, info.ChannelID) + + // Querying a non-existent channel should return an error. + _, _, _, err = store.FetchChannelEdgesByIDPreferred(ctx, 999999) + require.ErrorIs(t, err, ErrEdgeNotFound) + + // GetVersionsBySCID should report v1. + versions, err := store.GetVersionsBySCID(ctx, chanID) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + }, versions) + + // GetVersionsByOutpoint should also report v1. + versions, err = store.GetVersionsByOutpoint(ctx, &op) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + }, versions) + + // GetVersions for a non-existent SCID should return empty. + versions, err = store.GetVersionsBySCID(ctx, 999999) + require.NoError(t, err) + require.Empty(t, versions) + + unknownOutpoint := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 99, + } + versions, err = store.GetVersionsByOutpoint(ctx, &unknownOutpoint) + require.NoError(t, err) + require.Empty(t, versions) + + zombieChanID := uint64(888888) + err = store.MarkEdgeZombie( + ctx, lnwire.GossipVersion1, zombieChanID, + node1V1.PubKeyBytes, node2V1.PubKeyBytes, + ) + require.NoError(t, err) + + info, _, _, err = store.FetchChannelEdgesByIDPreferred( + ctx, zombieChanID, + ) + require.ErrorIs(t, err, ErrZombieEdge) + require.NotNil(t, info) + require.Equal(t, route.Vertex(node1V1.PubKeyBytes), info.NodeKey1Bytes) + require.Equal(t, route.Vertex(node2V1.PubKeyBytes), info.NodeKey2Bytes) + + if !isSQLDB { + return + } + + node1V2 := createNode(t, lnwire.GossipVersion2, node1Priv) + node2V2 := createNode(t, lnwire.GossipVersion2, node2Priv) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + // Add a duplicate v1/v2 channel and verify preferred lookup chooses + // the v2 edge while GetVersions reports both versions. + dupV1, dupSCID := createEdge( + lnwire.GossipVersion1, 101, 1, 0, 2, node1V1, node2V1, + ) + dupV2, _ := createEdge( + lnwire.GossipVersion2, 101, 1, 0, 2, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, dupV1)) + require.NoError(t, graph.AddChannelEdge(ctx, dupV2)) + + dupChanID := dupSCID.ToUint64() + dupOutpoint := dupV1.ChannelPoint + + info, _, _, err = store.FetchChannelEdgesByIDPreferred( + ctx, dupChanID, + ) + require.NoError(t, err) + require.Equal(t, dupChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion2, info.Version) + + info, _, _, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &dupOutpoint, + ) + require.NoError(t, err) + require.Equal(t, dupChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion2, info.Version) + versions, err = store.GetVersionsBySCID(ctx, dupChanID) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + lnwire.GossipVersion2, + }, versions) + + versions, err = store.GetVersionsByOutpoint(ctx, &dupOutpoint) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + lnwire.GossipVersion2, + }, versions) + + // Add another duplicate v1/v2 channel where only the v1 version has a + // policy. Preferred lookup should return the lower version with usable + // policy data instead of the higher version shell. + policyPrefV1, policyPrefSCID := createEdge( + lnwire.GossipVersion1, 102, 1, 0, 3, node1V1, node2V1, + ) + policyPrefV2, _ := createEdge( + lnwire.GossipVersion2, 102, 1, 0, 3, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV2)) + + policyOnlyV1 := newEdgePolicy( + lnwire.GossipVersion1, policyPrefV1.ChannelID, 1000, true, + ) + policyOnlyV1.ToNode = node2V1.PubKeyBytes + policyOnlyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, policyOnlyV1)) + + policyPrefChanID := policyPrefSCID.ToUint64() + policyPrefOutpoint := policyPrefV1.ChannelPoint + + info, p1, p2, err := store.FetchChannelEdgesByIDPreferred( + ctx, policyPrefChanID, + ) + require.NoError(t, err) + require.Equal(t, policyPrefChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion1, info.Version) + require.NotNil(t, p1) + require.Nil(t, p2) + + info, p1, p2, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &policyPrefOutpoint, + ) + require.NoError(t, err) + require.Equal(t, policyPrefChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion1, info.Version) + require.NotNil(t, p1) + require.Nil(t, p2) +} + +// TestDeleteNodePreferredRecomputation verifies that deleting one gossip +// version of a dual-version node correctly recomputes the preferred-node +// mapping so the surviving version remains visible via ForEachNode. +func TestDeleteNodePreferredRecomputation(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + // Create a node with both v1 and v2 announcements. + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeV1 := createNode(t, lnwire.GossipVersion1, priv) + nodeV2 := createNode(t, lnwire.GossipVersion2, priv) + + require.NoError(t, graph.AddNode(ctx, nodeV1)) + require.NoError(t, graph.AddNode(ctx, nodeV2)) + + // ForEachNode should return the node (v2 preferred). + var count int + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + require.Equal(t, lnwire.GossipVersion2, n.Version) + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 1, count, "node should be visible before delete") + + // Delete the v2 version. + require.NoError(t, store.DeleteNode( + ctx, lnwire.GossipVersion2, nodeV1.PubKeyBytes, + )) + + // The node should still be visible via ForEachNode, now as v1. + count = 0 + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + require.Equal(t, lnwire.GossipVersion1, n.Version) + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 1, count, + "node should still be visible after deleting one version") + + // Delete the remaining v1 version. + require.NoError(t, store.DeleteNode( + ctx, lnwire.GossipVersion1, nodeV1.PubKeyBytes, + )) + + // The node should now be gone. + count = 0 + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 0, count, + "node should be gone after deleting all versions") +} + +// TestNoCacheNodeTraversal verifies that the no-cache fallback paths of +// ChannelGraph.FetchNodeFeatures and ChannelGraph.ForEachNodeDirectedChannel +// return v1 data correctly. The no-cache path is only ever reached on the KV +// backend in production (which is v1-only), so cross-version semantics are +// the cache path's job, not this one's. +func TestNoCacheNodeTraversal(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("test only meaningful for SQL backend") + } + + ctx := t.Context() + + // Disable the cache so we exercise the no-cache code paths. + graph := MakeTestGraph(t, WithUseGraphCache(false)) + + priv1, err := btcec.NewPrivateKey() + require.NoError(t, err) + priv2, err := btcec.NewPrivateKey() + require.NoError(t, err) + + node1 := createNode(t, lnwire.GossipVersion1, priv1) + node2 := createNode(t, lnwire.GossipVersion1, priv2) + require.NoError(t, graph.AddNode(ctx, node1)) + require.NoError(t, graph.AddNode(ctx, node2)) + + // FetchNodeFeatures should return the node's v1 features. + features, err := graph.FetchNodeFeatures(ctx, node1.PubKeyBytes) + require.NoError(t, err) + require.False(t, features.IsEmpty(), + "v1 node should have features") + + // Add a v1 channel and verify ForEachNodeDirectedChannel sees it. + edge, _ := createEdge( + lnwire.GossipVersion1, 100, 0, 0, 0, node1, node2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edge)) + + pol := newEdgePolicy( + lnwire.GossipVersion1, edge.ChannelID, 1000, true, + ) + pol.ToNode = node2.PubKeyBytes + pol.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, pol)) + + var foundChannels int + err = graph.ForEachNodeDirectedChannel( + ctx, node1.PubKeyBytes, + func(_ *DirectedChannel) error { + foundChannels++ + + return nil + }, func() { + foundChannels = 0 + }, + ) + require.NoError(t, err) + require.Equal(t, 1, foundChannels, + "expected 1 channel for v1 node") +} + +// TestPreferredForEachNode verifies that SQLStore.ForEachNode returns one +// node per pubkey, preferring the highest announced version and otherwise +// falling back to the highest-version shell node. +func TestPreferredForEachNode(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + v1Only := createTestVertex(t, lnwire.GossipVersion1) + v1Only.Alias = fn.Some("v1-only") + require.NoError(t, graph.AddNode(ctx, v1Only)) + + bothPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + bothV1 := createNode(t, lnwire.GossipVersion1, bothPriv) + bothV1.Alias = fn.Some("both-v1") + require.NoError(t, graph.AddNode(ctx, bothV1)) + + bothV2 := createNode(t, lnwire.GossipVersion2, bothPriv) + bothV2.Alias = fn.Some("both-v2") + require.NoError(t, graph.AddNode(ctx, bothV2)) + + shellPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + shellPub, err := route.NewVertexFromBytes( + shellPriv.PubKey().SerializeCompressed(), + ) + require.NoError(t, err) + + require.NoError(t, graph.AddNode( + ctx, models.NewShellNode(lnwire.GossipVersion1, shellPub), + )) + require.NoError(t, graph.AddNode( + ctx, models.NewShellNode(lnwire.GossipVersion2, shellPub), + )) + + var nodeCount int + nodesByPub := make(map[route.Vertex]*models.Node) + err = store.ForEachNode(ctx, func(node *models.Node) error { + nodesByPub[node.PubKeyBytes] = node + nodeCount++ + + return nil + }, func() { + clear(nodesByPub) + nodeCount = 0 + }) + require.NoError(t, err) + require.Len(t, nodesByPub, 3) + require.Equal(t, 3, nodeCount, "unexpected duplicate nodes") + + gotV1Only := nodesByPub[v1Only.PubKeyBytes] + require.NotNil(t, gotV1Only) + require.Equal(t, lnwire.GossipVersion1, gotV1Only.Version) + require.Equal(t, "v1-only", gotV1Only.Alias.UnwrapOr("")) + require.True(t, gotV1Only.HaveAnnouncement()) + + gotBoth := nodesByPub[bothV1.PubKeyBytes] + require.NotNil(t, gotBoth) + require.Equal(t, lnwire.GossipVersion2, gotBoth.Version) + require.Equal(t, "both-v2", gotBoth.Alias.UnwrapOr("")) + require.True(t, gotBoth.HaveAnnouncement()) + + gotShell := nodesByPub[shellPub] + require.NotNil(t, gotShell) + require.Equal(t, lnwire.GossipVersion2, gotShell.Version) + require.False(t, gotShell.HaveAnnouncement()) +} + +// TestPreferredForEachChannel verifies that SQLStore.ForEachChannel returns +// one channel per SCID, preferring a higher-version channel when both versions +// have policies, preserving lower-version policy data when the higher version +// has none, and otherwise falling back to the highest-version no-policy +// channel. +func TestPreferredForEachChannel(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + node1Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + node2Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + node1V1 := createNode(t, lnwire.GossipVersion1, node1Priv) + node1V2 := createNode(t, lnwire.GossipVersion2, node1Priv) + node2V1 := createNode(t, lnwire.GossipVersion1, node2Priv) + node2V2 := createNode(t, lnwire.GossipVersion2, node2Priv) + + require.NoError(t, graph.AddNode(ctx, node1V1)) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V1)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + v1Only, _ := createEdge( + lnwire.GossipVersion1, 200, 0, 0, 1, node1V1, node2V1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, v1Only)) + + policyPrefV1, _ := createEdge( + lnwire.GossipVersion1, 201, 0, 0, 2, node1V1, node2V1, + ) + policyPrefV2, _ := createEdge( + lnwire.GossipVersion2, 201, 0, 0, 2, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV2)) + + policyOnlyV1 := newEdgePolicy( + lnwire.GossipVersion1, policyPrefV1.ChannelID, 1000, true, + ) + policyOnlyV1.ToNode = node2V1.PubKeyBytes + policyOnlyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, policyOnlyV1)) + + versionPrefV1, _ := createEdge( + lnwire.GossipVersion1, 202, 0, 0, 3, node1V1, node2V1, + ) + versionPrefV2, _ := createEdge( + lnwire.GossipVersion2, 202, 0, 0, 3, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, versionPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, versionPrefV2)) + + versionPolicyV1 := newEdgePolicy( + lnwire.GossipVersion1, versionPrefV1.ChannelID, 1001, true, + ) + versionPolicyV1.ToNode = node2V1.PubKeyBytes + versionPolicyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, versionPolicyV1)) + + versionPolicyV2 := newEdgePolicy( + lnwire.GossipVersion2, versionPrefV2.ChannelID, 1002, true, + ) + versionPolicyV2.ToNode = node2V2.PubKeyBytes + versionPolicyV2.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, versionPolicyV2)) + + shellPrefV1, _ := createEdge( + lnwire.GossipVersion1, 203, 0, 0, 4, node1V1, node2V1, + ) + shellPrefV2, _ := createEdge( + lnwire.GossipVersion2, 203, 0, 0, 4, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, shellPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, shellPrefV2)) + + type channelResult struct { + info *models.ChannelEdgeInfo + p1 *models.ChannelEdgePolicy + p2 *models.ChannelEdgePolicy + } + var chanCount int + channelsByID := make(map[uint64]channelResult) + err = store.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, + p1, p2 *models.ChannelEdgePolicy) error { + + channelsByID[info.ChannelID] = channelResult{ + info: info, + p1: p1, + p2: p2, + } + chanCount++ + + return nil + }, func() { + clear(channelsByID) + chanCount = 0 + }) + require.NoError(t, err) + require.Len(t, channelsByID, 4) + require.Equal(t, 4, chanCount, "unexpected duplicate channels") + + gotV1Only := channelsByID[v1Only.ChannelID] + require.Equal(t, lnwire.GossipVersion1, gotV1Only.info.Version) + require.Nil(t, gotV1Only.p1) + require.Nil(t, gotV1Only.p2) + + gotPolicyPref := channelsByID[policyPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion1, gotPolicyPref.info.Version) + require.NotNil(t, gotPolicyPref.p1) + require.Nil(t, gotPolicyPref.p2) + + gotVersionPref := channelsByID[versionPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion2, gotVersionPref.info.Version) + require.NotNil(t, gotVersionPref.p1) + + gotShellPref := channelsByID[shellPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion2, gotShellPref.info.Version) + require.Nil(t, gotShellPref.p1) + require.Nil(t, gotShellPref.p2) +} + +// TestForEachNodeCachedHonoursVersion verifies that +// SQLStore.ForEachNodeCached uses the requested gossip version when looking +// up channels for the iterated nodes, rather than silently falling back to +// v1 channels for v2 nodes. +func TestForEachNodeCachedHonoursVersion(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("test only meaningful for SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t, WithUseGraphCache(false)) + + // Create two nodes that exist under both v1 and v2. + priv1, err := btcec.NewPrivateKey() + require.NoError(t, err) + priv2, err := btcec.NewPrivateKey() + require.NoError(t, err) + + require.NoError(t, graph.AddNode( + ctx, createNode(t, lnwire.GossipVersion1, priv1), + )) + require.NoError(t, graph.AddNode( + ctx, createNode(t, lnwire.GossipVersion1, priv2), + )) + node1V2 := createNode(t, lnwire.GossipVersion2, priv1) + node2V2 := createNode(t, lnwire.GossipVersion2, priv2) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + // Add a channel only under v2 between the two nodes. If + // ForEachNodeCached honours its version parameter, callers asking for + // v2 nodes must see this channel; if it instead hardcodes v1 in the + // channel lookup, the channel will be missing. + edgeV2, _ := createEdge( + lnwire.GossipVersion2, 100, 1, 0, 1, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edgeV2)) + + store := graph.db + chansSeen := 0 + err = store.ForEachNodeCached( + ctx, lnwire.GossipVersion2, false, + func(_ context.Context, n route.Vertex, _ []net.Addr, + chans map[uint64]*DirectedChannel) error { + + if n == node1V2.PubKeyBytes || + n == node2V2.PubKeyBytes { + + chansSeen += len(chans) + } + + return nil + }, func() { chansSeen = 0 }, + ) + require.NoError(t, err) + require.Equal(t, 2, chansSeen, + "v2 ForEachNodeCached should report the v2 channel for "+ + "each endpoint") +} diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index 0230300290f..1a172d9eb9b 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -95,11 +95,11 @@ type Store interface { //nolint:interfacebloat chans map[uint64]*DirectedChannel) error, reset func()) error - // ForEachNode iterates through all the stored vertices/nodes in the - // graph, executing the passed callback with each node encountered. If - // the callback returns an error, then the transaction is aborted and - // the iteration stops early. - ForEachNode(ctx context.Context, v lnwire.GossipVersion, + // ForEachNode iterates through all nodes in the graph across all + // gossip versions, yielding each unique node exactly once. The + // callback receives the best available Node (highest advertised + // version preferred, falling back to shell nodes). + ForEachNode(ctx context.Context, cb func(*models.Node) error, reset func()) error // ForEachNodeCacheable iterates through all the stored vertices/nodes @@ -162,21 +162,16 @@ type Store interface { //nolint:interfacebloat GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error - // ForEachChannel iterates through all the channel edges stored within - // the graph and invokes the passed callback for each edge. The callback - // takes two edges as since this is a directed graph, both the in/out - // edges are visited. If the callback returns an error, then the - // transaction is aborted and the iteration stops early. - // - // NOTE: If an edge can't be found, or wasn't advertised, then a nil - // pointer for that particular channel edge routing policy will be - // passed into the callback. - // - // TODO(elle): add a cross-version iteration API and make this iterate - // over all versions. - ForEachChannel(ctx context.Context, v lnwire.GossipVersion, + // ForEachChannel iterates through all channel edges stored within the + // graph across all gossip versions, yielding each unique channel + // exactly once. The callback receives the edge info and both + // directional policies. When both versions are present, v2 is + // preferred. Nil pointers are passed for policies that haven't been + // advertised. + ForEachChannel(ctx context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error + *models.ChannelEdgePolicy) error, + reset func()) error // ForEachChannelCacheable iterates through all the channel edges stored // within the graph and invokes the passed callback for each edge. The @@ -324,6 +319,37 @@ type Store interface { //nolint:interfacebloat *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) + // FetchChannelEdgesByIDPreferred behaves like FetchChannelEdgesByID + // but is version-agnostic: if the channel exists under multiple gossip + // versions it returns the preferred record. Preferred means the highest + // version with policies, falling back to the highest bare version. + FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) + + // FetchChannelEdgesByOutpointPreferred behaves like + // FetchChannelEdgesByOutpoint but is version-agnostic: if the channel + // exists under multiple gossip versions it returns the preferred + // record. Preferred means the highest version with policies, falling + // back to the highest bare version. + FetchChannelEdgesByOutpointPreferred(ctx context.Context, + op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) + + // GetVersionsBySCID returns the list of gossip versions for which a + // channel with the given SCID exists in the database, ordered + // ascending. + GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) + + // GetVersionsByOutpoint returns the list of gossip versions for which + // a channel with the given funding outpoint exists in the database, + // ordered ascending. + GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) + // ChannelView returns the verifiable edge information for each active // channel within the known channel graph for the given gossip version. // The set of UTXO's (along with their scripts) returned are the ones diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index ed6d0e07e53..138c619d8f2 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -408,13 +408,10 @@ func (c *KVStore) AddrsForNode(ctx context.Context, v lnwire.GossipVersion, // NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer // for that particular channel edge routing policy will be passed into the // callback. -func (c *KVStore) ForEachChannel(_ context.Context, v lnwire.GossipVersion, +func (c *KVStore) ForEachChannel(_ context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { - - if v != lnwire.GossipVersion1 { - return ErrVersionNotSupportedForKVDB - } + *models.ChannelEdgePolicy) error, + reset func()) error { return forEachChannel(c.db, cb, reset) } @@ -842,13 +839,9 @@ func (c *KVStore) DisabledChannelIDs( // early. // // NOTE: this is part of the Store interface. -func (c *KVStore) ForEachNode(_ context.Context, v lnwire.GossipVersion, +func (c *KVStore) ForEachNode(_ context.Context, cb func(*models.Node) error, reset func()) error { - if v != lnwire.GossipVersion1 { - return ErrVersionNotSupportedForKVDB - } - return forEachNode(c.db, func(tx kvdb.RTx, node *models.Node) error { @@ -4198,6 +4191,85 @@ func (c *KVStore) FetchChannelEdgesByID(_ context.Context, return edgeInfo, policy1, policy2, nil } +// FetchChannelEdgesByIDPreferred looks up the channel by ID. The KV store +// only supports gossip v1, so this simply delegates to the versioned fetch. +// +// NOTE: part of the Store interface. +func (c *KVStore) FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + return c.FetchChannelEdgesByID(ctx, lnwire.GossipVersion1, chanID) +} + +// FetchChannelEdgesByOutpointPreferred looks up the channel by funding +// outpoint. The KV store only supports gossip v1, so this simply delegates to +// the versioned fetch. +// +// NOTE: part of the Store interface. +func (c *KVStore) FetchChannelEdgesByOutpointPreferred( + ctx context.Context, op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + return c.FetchChannelEdgesByOutpoint( + ctx, lnwire.GossipVersion1, op, + ) +} + +// GetVersionsBySCID returns the gossip versions for which a channel with the +// given SCID exists. The KV store only supports gossip v1, so at most one +// version is returned. +// +// NOTE: part of the Store interface. +func (c *KVStore) GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) { + + _, _, _, err := c.FetchChannelEdgesByID( + ctx, lnwire.GossipVersion1, chanID, + ) + switch { + case errors.Is(err, ErrEdgeNotFound): + return nil, nil + + case errors.Is(err, ErrZombieEdge): + return nil, nil + + case err != nil: + return nil, err + + default: + return []lnwire.GossipVersion{lnwire.GossipVersion1}, nil + } +} + +// GetVersionsByOutpoint returns the gossip versions for which a channel with +// the given funding outpoint exists. The KV store only supports gossip v1, so +// at most one version is returned. +// +// NOTE: part of the Store interface. +func (c *KVStore) GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) { + + _, _, _, err := c.FetchChannelEdgesByOutpoint( + ctx, lnwire.GossipVersion1, op, + ) + switch { + case errors.Is(err, ErrEdgeNotFound): + return nil, nil + + case errors.Is(err, ErrZombieEdge): + return nil, nil + + case err != nil: + return nil, err + + default: + return []lnwire.GossipVersion{lnwire.GossipVersion1}, nil + } +} + // IsPublicNode is a helper method that determines whether the node with the // given public key is seen as a public node in the graph from the graph's // source node's point of view. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index f4824d75e1b..961c98ab394 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -54,7 +54,8 @@ type SQLQueries interface { GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) GetNodesByBlockHeightRange(ctx context.Context, arg sqlc.GetNodesByBlockHeightRangeParams) ([]sqlc.GraphNode, error) GetPublicNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetPublicNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) - ListNodesPaginated(ctx context.Context, arg sqlc.ListNodesPaginatedParams) ([]sqlc.GraphNode, error) + ListPreferredNodesPaginated(ctx context.Context, arg sqlc.ListPreferredNodesPaginatedParams) ([]sqlc.ListPreferredNodesPaginatedRow, error) + UpsertPreferredNode(ctx context.Context, pubKey []byte) error ListNodeIDsAndPubKeys(ctx context.Context, arg sqlc.ListNodeIDsAndPubKeysParams) ([]sqlc.ListNodeIDsAndPubKeysRow, error) IsPublicV1Node(ctx context.Context, pubKey []byte) (bool, error) IsPublicV2Node(ctx context.Context, pubKey []byte) (bool, error) @@ -105,6 +106,8 @@ type SQLQueries interface { ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) ListChannelsForNodeIDs(ctx context.Context, arg sqlc.ListChannelsForNodeIDsParams) ([]sqlc.ListChannelsForNodeIDsRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error) + ListPreferredChannelsPaginated(ctx context.Context, arg sqlc.ListPreferredChannelsPaginatedParams) ([]sqlc.ListPreferredChannelsPaginatedRow, error) + UpsertPreferredChannel(ctx context.Context, scid []byte) error ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesForCachePaginatedParams) ([]sqlc.ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsPaginated(ctx context.Context, arg sqlc.ListChannelsPaginatedParams) ([]sqlc.ListChannelsPaginatedRow, error) ListChannelsPaginatedV2(ctx context.Context, arg sqlc.ListChannelsPaginatedV2Params) ([]sqlc.ListChannelsPaginatedV2Row, error) @@ -439,7 +442,13 @@ func (s *SQLStore) DeleteNode(ctx context.Context, v lnwire.GossipVersion, return fmt.Errorf("deleted %d rows, expected 1", rows) } - return err + // Recompute the preferred mapping. If another version of + // this node still exists, UpsertPreferredNode will point + // the mapping at it. If no version remains, the + // INSERT...SELECT is a no-op and the CASCADE on the FK + // already removed the mapping row when the node was + // deleted above. + return db.UpsertPreferredNode(ctx, pubKey[:]) }, sqldb.NoOpReset) if err != nil { return fmt.Errorf("unable to delete node: %w", err) @@ -1148,17 +1157,12 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context, // early. // // NOTE: part of the Store interface. -func (s *SQLStore) ForEachNode(ctx context.Context, v lnwire.GossipVersion, +func (s *SQLStore) ForEachNode(ctx context.Context, cb func(node *models.Node) error, reset func()) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachNodePaginated( - ctx, s.cfg.QueryCfg, db, - v, func(_ context.Context, _ int64, - node *models.Node) error { - - return cb(node) - }, + return forEachPreferredNodePaginated( + ctx, s.cfg.QueryCfg, db, cb, ) }, reset) } @@ -1782,7 +1786,7 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context, // page. allChannels, err := db.ListChannelsForNodeIDs( ctx, sqlc.ListChannelsForNodeIDsParams{ - Version: int16(lnwire.GossipVersion1), + Version: int16(v), Node1Ids: nodeIDs, Node2Ids: nodeIDs, }, @@ -2026,16 +2030,14 @@ func (s *SQLStore) ForEachChannelCacheable(ctx context.Context, // // NOTE: part of the Store interface. func (s *SQLStore) ForEachChannel(ctx context.Context, - v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error, reset func()) error { - if !isKnownGossipVersion(v) { - return fmt.Errorf("unsupported gossip version: %d", v) - } - return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachChannelWithPolicies(ctx, db, s.cfg, v, cb) + return forEachPreferredChannelWithPolicies( + ctx, db, s.cfg, cb, + ) }, reset) } @@ -2458,7 +2460,25 @@ func (s *SQLStore) DeleteChannelEdges(ctx context.Context, } } - return s.deleteChannels(ctx, db, chanIDsToDelete) + err = s.deleteChannels(ctx, db, chanIDsToDelete) + if err != nil { + return err + } + + // The CASCADE on graph_preferred_channels will have + // removed the mapping row for any deleted channel. If + // another version of the same SCID still exists, we + // need to re-insert the mapping. + for _, chanID := range chanIDs { + scidBytes := channelIDToBytes(chanID) + err = db.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return fmt.Errorf("recalc preferred "+ + "channel(%d): %w", chanID, err) + } + } + + return nil }, func() { edges = nil @@ -2547,12 +2567,12 @@ func (s *SQLStore) FetchChannelEdgesByID(ctx context.Context, switch v { case gossipV1: edge, err = models.NewV1Channel( - 0, chainhash.Hash{}, node1, + chanID, chainhash.Hash{}, node1, node2, &models.ChannelV1Fields{}, ) case gossipV2: edge, err = models.NewV2Channel( - 0, chainhash.Hash{}, node1, + chanID, chainhash.Hash{}, node1, node2, &models.ChannelV2Fields{}, ) } @@ -2684,6 +2704,360 @@ func (s *SQLStore) FetchChannelEdgesByOutpoint(ctx context.Context, return edge, policy1, policy2, nil } +var ( + preferredGossipVersionsDescending = []lnwire.GossipVersion{ + gossipV2, gossipV1, + } + preferredGossipVersionsAscending = []lnwire.GossipVersion{ + gossipV1, gossipV2, + } +) + +// FetchChannelEdgesByIDPreferred tries each known gossip version from highest +// to lowest and returns the first result that has at least one policy. If no +// version has policies, the highest version found is returned. This prevents a +// v2 channel with no policies from hiding a v1 channel that has valid policy +// data. +// +// If no live edge is found across versions but at least one version reports +// the channel as a zombie, ErrZombieEdge is returned with the zombie edge info +// populated so callers can resurrect it. +// +// NOTE: part of the Store interface. +func (s *SQLStore) FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + var ( + bestInfo *models.ChannelEdgeInfo + bestP1 *models.ChannelEdgePolicy + bestP2 *models.ChannelEdgePolicy + bestZombie *models.ChannelEdgeInfo + chanIDB = channelIDToBytes(chanID) + buildLiveEdge = func(ctx context.Context, db SQLQueries, + row sqlc.GetChannelBySCIDWithPoliciesRow) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + node1, node2, err := buildNodeVertices( + row.GraphNode.PubKey, row.GraphNode_2.PubKey, + ) + if err != nil { + return nil, nil, nil, err + } + + edge, err := getAndBuildEdgeInfo( + ctx, s.cfg, db, row.GraphChannel, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "extract channel policies: %w", err) + } + + policy1, policy2, err := getAndBuildChanPolicies( + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, + edge.ChannelID, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel policies: %w", err) + } + + return edge, policy1, policy2, nil + } + ) + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsDescending { + row, err := db.GetChannelBySCIDWithPolicies( + ctx, sqlc.GetChannelBySCIDWithPoliciesParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + zombie, err := db.GetZombieChannel( + ctx, sqlc.GetZombieChannelParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to check if "+ + "channel is zombie: %w", err) + } + + if bestZombie == nil { + var err error + bestZombie, err = buildZombieEdge( + v, chanID, zombie.NodeKey1, + zombie.NodeKey2, + ) + if err != nil { + return err + } + } + + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + info, p1, p2, err := buildLiveEdge(ctx, db, row) + if err != nil { + return err + } + + if p1 != nil || p2 != nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + + return nil + } + + if bestInfo == nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + } + } + + if bestInfo != nil { + return nil + } + + if bestZombie != nil { + return ErrZombieEdge + } + + return ErrEdgeNotFound + }, sqldb.NoOpReset) + if errors.Is(err, ErrZombieEdge) { + return bestZombie, nil, nil, ErrZombieEdge + } + if err != nil { + return nil, nil, nil, fmt.Errorf("could not fetch preferred "+ + "channel: %w", err) + } + + return bestInfo, bestP1, bestP2, nil +} + +// FetchChannelEdgesByOutpointPreferred tries each known gossip version from +// highest to lowest and returns the first result that has at least one policy. +// If no version has policies, the highest version found is returned. This +// prevents a v2 channel with no policies from hiding a v1 channel that has +// valid policy data. +// +// NOTE: part of the Store interface. +func (s *SQLStore) FetchChannelEdgesByOutpointPreferred( + ctx context.Context, op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + var ( + bestInfo *models.ChannelEdgeInfo + bestP1 *models.ChannelEdgePolicy + bestP2 *models.ChannelEdgePolicy + buildLiveEdge = func(ctx context.Context, db SQLQueries, + row sqlc.GetChannelByOutpointWithPoliciesRow) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + node1, node2, err := buildNodeVertices( + row.Node1Pubkey, row.Node2Pubkey, + ) + if err != nil { + return nil, nil, nil, err + } + + edge, err := getAndBuildEdgeInfo( + ctx, s.cfg, db, row.GraphChannel, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "extract channel policies: %w", err) + } + + policy1, policy2, err := getAndBuildChanPolicies( + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, + edge.ChannelID, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel policies: %w", err) + } + + return edge, policy1, policy2, nil + } + ) + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsDescending { + params := sqlc.GetChannelByOutpointWithPoliciesParams{ + Outpoint: op.String(), + Version: int16(v), + } + row, err := db.GetChannelByOutpointWithPolicies( + ctx, params, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + info, p1, p2, err := buildLiveEdge(ctx, db, row) + if err != nil { + return err + } + + if p1 != nil || p2 != nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + + return nil + } + + if bestInfo == nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + } + } + + if bestInfo != nil { + return nil + } + + return ErrEdgeNotFound + }, sqldb.NoOpReset) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not fetch preferred "+ + "channel: %w", err) + } + + return bestInfo, bestP1, bestP2, nil +} + +// GetVersionsBySCID returns the gossip versions for which a channel with the +// given SCID exists in the database. +// +// NOTE: part of the Store interface. +func (s *SQLStore) GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) { + + var versions []lnwire.GossipVersion + chanIDB := channelIDToBytes(chanID) + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsAscending { + _, err := db.GetChannelBySCID( + ctx, sqlc.GetChannelBySCIDParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + versions = append(versions, v) + } + + return nil + }, sqldb.NoOpReset) + if err != nil { + return nil, err + } + + return versions, nil +} + +// GetVersionsByOutpoint returns the gossip versions for which a channel with +// the given funding outpoint exists in the database. +// +// NOTE: part of the Store interface. +func (s *SQLStore) GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) { + + var versions []lnwire.GossipVersion + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsAscending { + params := sqlc.GetChannelByOutpointWithPoliciesParams{ + Outpoint: op.String(), + Version: int16(v), + } + _, err := db.GetChannelByOutpointWithPolicies( + ctx, params, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + versions = append(versions, v) + } + + return nil + }, sqldb.NoOpReset) + if err != nil { + return nil, err + } + + return versions, nil +} + +func buildZombieEdge(v lnwire.GossipVersion, chanID uint64, nodeKey1, + nodeKey2 []byte) (*models.ChannelEdgeInfo, error) { + + node1, err := route.NewVertexFromBytes(nodeKey1) + if err != nil { + return nil, err + } + node2, err := route.NewVertexFromBytes(nodeKey2) + if err != nil { + return nil, err + } + + switch v { + case gossipV1: + return models.NewV1Channel( + chanID, chainhash.Hash{}, node1, node2, + &models.ChannelV1Fields{}, + ) + + case gossipV2: + return models.NewV2Channel( + chanID, chainhash.Hash{}, node1, node2, + &models.ChannelV2Fields{}, + ) + + default: + return nil, fmt.Errorf("unsupported gossip version: %d", v) + } +} + // HasV1ChannelEdge returns true if the database knows of a channel edge // with the passed channel ID, and false otherwise. If an edge with that ID // is found within the graph, then two time stamps representing the last time @@ -3343,6 +3717,10 @@ func (s *SQLStore) PruneGraph(ctx context.Context, return err } + // Delete all matched channels. GetChannelsByOutpoints + // returns every version for a given outpoint, so all + // versions are deleted and the CASCADE on + // graph_preferred_channels handles cleanup. err = s.deleteChannels(ctx, db, chansToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) @@ -3650,9 +4028,22 @@ func (s *SQLStore) pruneGraphNodes(ctx context.Context, "nodes: %w", err) } + // Recalc preferred node mappings for all affected pub_keys. The + // CASCADE may have removed some entries; if another version of the + // node still exists, UpsertPreferredNode will re-insert the mapping. + // If no version remains, the upsert is a no-op (the INSERT ... SELECT + // returns no rows). Note that nodeKeys may contain duplicates if a + // node existed in multiple gossip versions and was pruned in all of + // them; UpsertPreferredNode is idempotent, so this is harmless. prunedNodes := make([]route.Vertex, len(nodeKeys)) - for i, nodeKey := range nodeKeys { - pub, err := route.NewVertexFromBytes(nodeKey) + for i, key := range nodeKeys { + err = db.UpsertPreferredNode(ctx, key) + if err != nil { + return nil, fmt.Errorf("recalc preferred "+ + "node: %w", err) + } + + pub, err := route.NewVertexFromBytes(key) if err != nil { return nil, fmt.Errorf("unable to parse pubkey "+ "from bytes: %w", err) @@ -3727,6 +4118,10 @@ func (s *SQLStore) DisconnectBlockAtHeight(ctx context.Context, removedChans = channelEdges + // Delete all matched channels. GetChannelsBySCIDRange + // returns every version for a given SCID, so all versions + // are deleted and the CASCADE on + // graph_preferred_channels handles cleanup. err = s.deleteChannels(ctx, db, chanIDsToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) @@ -4309,6 +4704,15 @@ func updateChanEdgePolicy(ctx context.Context, tx SQLQueries, "policy extra TLVs: %w", err) } + // Adding a policy may change which version is preferred for this + // SCID (a version with policies outranks one without). + scidBytes := channelIDToBytes(edge.ChannelID) + err = tx.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("upserting "+ + "preferred channel(%d): %w", edge.ChannelID, err) + } + return node1Pub, node2Pub, isNode1, nil } @@ -4662,6 +5066,13 @@ func upsertSourceNode(ctx context.Context, db SQLQueries, node.PubKeyBytes, err) } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + node.PubKeyBytes, err) + } + // We can exit here if we don't have the announcement yet. if !node.HaveAnnouncement() { return nodeID, nil @@ -4700,6 +5111,14 @@ func upsertNode(ctx context.Context, db SQLQueries, // We can exit here if we don't have the announcement yet. if !node.HaveAnnouncement() { + // Even a shell node may become the preferred entry for + // this pub_key (e.g. first v2 shell for a key). + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred "+ + "node(%x): %w", node.PubKeyBytes, err) + } + return nodeID, nil } @@ -4709,6 +5128,13 @@ func upsertNode(ctx context.Context, db SQLQueries, return 0, err } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + node.PubKeyBytes, err) + } + return nodeID, nil } @@ -5180,6 +5606,14 @@ func insertChannel(ctx context.Context, db SQLQueries, } } + // Recompute the preferred channel mapping for this SCID. + scidBytes := channelIDToBytes(edge.ChannelID) + err = db.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return fmt.Errorf("upserting preferred channel(%d): %w", + edge.ChannelID, err) + } + return nil } @@ -5213,6 +5647,13 @@ func maybeCreateShellNode(ctx context.Context, db SQLQueries, return 0, fmt.Errorf("unable to create shell node: %w", err) } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, pubKey[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + pubKey[:], err) + } + return id, nil } @@ -5984,6 +6425,54 @@ func extractChannelPolicies(row any) (*sqlc.GraphChannelPolicy, return policy1, policy2, nil + case sqlc.ListPreferredChannelsPaginatedRow: + if r.Policy1ID.Valid { + policy1 = &sqlc.GraphChannelPolicy{ + ID: r.Policy1ID.Int64, + Version: r.Policy1Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy1NodeID.Int64, + Timelock: r.Policy1Timelock.Int32, + FeePpm: r.Policy1FeePpm.Int64, + BaseFeeMsat: r.Policy1BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy1MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy1MaxHtlcMsat, + LastUpdate: r.Policy1LastUpdate, + InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat, + Disabled: r.Policy1Disabled, + MessageFlags: r.Policy1MessageFlags, + ChannelFlags: r.Policy1ChannelFlags, + Signature: r.Policy1Signature, + BlockHeight: r.Policy1BlockHeight, + DisableFlags: r.Policy1DisableFlags, + } + } + if r.Policy2ID.Valid { + policy2 = &sqlc.GraphChannelPolicy{ + ID: r.Policy2ID.Int64, + Version: r.Policy2Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy2NodeID.Int64, + Timelock: r.Policy2Timelock.Int32, + FeePpm: r.Policy2FeePpm.Int64, + BaseFeeMsat: r.Policy2BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy2MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy2MaxHtlcMsat, + LastUpdate: r.Policy2LastUpdate, + InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat, + Disabled: r.Policy2Disabled, + MessageFlags: r.Policy2MessageFlags, + ChannelFlags: r.Policy2ChannelFlags, + Signature: r.Policy2Signature, + BlockHeight: r.Policy2BlockHeight, + DisableFlags: r.Policy2DisableFlags, + } + } + + return policy1, policy2, nil + case sqlc.ListChannelsWithPoliciesPaginatedRow: if r.Policy1ID.Valid { policy1 = &sqlc.GraphChannelPolicy{ @@ -6553,32 +7042,32 @@ func batchLoadChannelPolicyExtrasHelper(ctx context.Context, ) } -// forEachNodePaginated executes a paginated query to process each node in the -// graph. It uses the provided SQLQueries interface to fetch nodes in batches -// and applies the provided processNode function to each node. -func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, - db SQLQueries, protocol lnwire.GossipVersion, - processNode func(context.Context, int64, - *models.Node) error) error { +// forEachPreferredNodePaginated executes a paginated query that yields one +// preferred node per pubkey across all gossip versions. +func forEachPreferredNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, + db SQLQueries, processNode func(*models.Node) error) error { - pageQueryFunc := func(ctx context.Context, lastID int64, - limit int32) ([]sqlc.GraphNode, error) { + pageQueryFunc := func(ctx context.Context, cursor []byte, + limit int32) ([]sqlc.ListPreferredNodesPaginatedRow, error) { - return db.ListNodesPaginated( - ctx, sqlc.ListNodesPaginatedParams{ - Version: int16(protocol), - ID: lastID, - Limit: limit, + return db.ListPreferredNodesPaginated( + ctx, sqlc.ListPreferredNodesPaginatedParams{ + PubKey: cursor, + Limit: limit, }, ) } - extractPageCursor := func(node sqlc.GraphNode) int64 { - return node.ID + extractPageCursor := func( + row sqlc.ListPreferredNodesPaginatedRow) []byte { + + return row.GraphNode.PubKey } - collectFunc := func(node sqlc.GraphNode) (int64, error) { - return node.ID, nil + collectFunc := func( + row sqlc.ListPreferredNodesPaginatedRow) (int64, error) { + + return row.GraphNode.ID, nil } batchQueryFunc := func(ctx context.Context, @@ -6587,29 +7076,32 @@ func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, return batchLoadNodeData(ctx, cfg, db, nodeIDs) } - processItem := func(ctx context.Context, dbNode sqlc.GraphNode, + processItem := func(_ context.Context, + row sqlc.ListPreferredNodesPaginatedRow, batchData *batchNodeData) error { + dbNode := row.GraphNode node, err := buildNodeWithBatchData(dbNode, batchData) if err != nil { - return fmt.Errorf("unable to build "+ - "node(id=%d): %w", dbNode.ID, err) + return fmt.Errorf("unable to build node(id=%d): %w", + dbNode.ID, err) } - return processNode(ctx, dbNode.ID, node) + return processNode(node) } return sqldb.ExecuteCollectAndBatchWithSharedDataQuery( - ctx, cfg, int64(-1), pageQueryFunc, extractPageCursor, + ctx, cfg, []byte{}, pageQueryFunc, extractPageCursor, collectFunc, batchQueryFunc, processItem, ) } -// forEachChannelWithPolicies executes a paginated query to process each channel -// with policies in the graph. -func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, - cfg *SQLStoreConfig, v lnwire.GossipVersion, - processChannel func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, +// forEachPreferredChannelWithPolicies executes a paginated query that yields +// one preferred channel per SCID across all gossip versions. +func forEachPreferredChannelWithPolicies(ctx context.Context, + db SQLQueries, cfg *SQLStoreConfig, + processChannel func(*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { type channelBatchIDs struct { @@ -6617,33 +7109,32 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, policyIDs []int64 } - pageQueryFunc := func(ctx context.Context, lastID int64, - limit int32) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, + pageQueryFunc := func(ctx context.Context, cursor []byte, + limit int32) ([]sqlc.ListPreferredChannelsPaginatedRow, error) { - return db.ListChannelsWithPoliciesPaginated( - ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{ - Version: int16(v), - ID: lastID, - Limit: limit, + return db.ListPreferredChannelsPaginated( + ctx, sqlc.ListPreferredChannelsPaginatedParams{ + Scid: cursor, + Limit: limit, }, ) } extractPageCursor := func( - row sqlc.ListChannelsWithPoliciesPaginatedRow) int64 { + row sqlc.ListPreferredChannelsPaginatedRow) []byte { - return row.GraphChannel.ID + return row.GraphChannel.Scid } - collectFunc := func(row sqlc.ListChannelsWithPoliciesPaginatedRow) ( + collectFunc := func( + row sqlc.ListPreferredChannelsPaginatedRow) ( channelBatchIDs, error) { ids := channelBatchIDs{ channelID: row.GraphChannel.ID, } - // Extract policy IDs from the row. dbPol1, dbPol2, err := extractChannelPolicies(row) if err != nil { return ids, err @@ -6679,7 +7170,7 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, } processItem := func(ctx context.Context, - row sqlc.ListChannelsWithPoliciesPaginatedRow, + row sqlc.ListPreferredChannelsPaginatedRow, batchData *batchChannelData) error { node1, node2, err := buildNodeVertices( @@ -6714,7 +7205,7 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, } return sqldb.ExecuteCollectAndBatchWithSharedDataQuery( - ctx, cfg.QueryCfg, int64(-1), pageQueryFunc, extractPageCursor, + ctx, cfg.QueryCfg, []byte{}, pageQueryFunc, extractPageCursor, collectFunc, batchDataFunc, processItem, ) } diff --git a/rpcserver.go b/rpcserver.go index d4a9ce51bb5..4d43180fe8c 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7000,11 +7000,10 @@ func (r *rpcServer) DescribeGraph(ctx context.Context, } } - // Obtain the pointer to the V1 channel graph. This will provide a - // consistent view of the graph due to bolt db's transactional model. - // - // TODO(elle): switch to a cross-version graph view when available. - graph := r.server.v1Graph + // Obtain the pointer to the cross-version channel graph. This will + // provide a consistent view of the graph due to bolt db's + // transactional model. + graph := r.server.graphDB // First iterate through all the known nodes (connected or unconnected // within the graph), collating their current state into the RPC diff --git a/sqldb/migrations.go b/sqldb/migrations.go index 241e5c0d683..afee7220294 100644 --- a/sqldb/migrations.go +++ b/sqldb/migrations.go @@ -136,6 +136,11 @@ var ( Version: 18, SchemaVersion: 15, }, + { + Name: "000016_graph_preferred_lookups", + Version: 19, + SchemaVersion: 16, + }, }, migrationAdditions...) // ErrMigrationMismatch is returned when a migrated record does not diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 703afd8f458..48d0f3284e6 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -4171,38 +4171,222 @@ func (q *Queries) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndP return items, nil } -const listNodesPaginated = `-- name: ListNodesPaginated :many -SELECT id, version, pub_key, alias, last_update, color, signature, block_height -FROM graph_nodes -WHERE version = $1 AND id > $2 -ORDER BY id -LIMIT $3 +const listPreferredChannelsPaginated = `-- name: ListPreferredChannelsPaginated :many +SELECT + c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, c.signature, c.funding_pk_script, c.merkle_root_hash, + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.block_height AS policy1_block_height, + cp1.disable_flags AS policy1_disable_flags, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy_2_signature, + cp2.block_height AS policy_2_block_height, + cp2.disable_flags AS policy_2_disable_flags + +FROM graph_preferred_channels pc +JOIN graph_channels c ON c.id = pc.channel_id +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE pc.scid > $1 +ORDER BY pc.scid +LIMIT $2 ` -type ListNodesPaginatedParams struct { - Version int16 - ID int64 - Limit int32 +type ListPreferredChannelsPaginatedParams struct { + Scid []byte + Limit int32 +} + +type ListPreferredChannelsPaginatedRow struct { + GraphChannel GraphChannel + Node1Pubkey []byte + Node2Pubkey []byte + Policy1ID sql.NullInt64 + Policy1NodeID sql.NullInt64 + Policy1Version sql.NullInt16 + Policy1Timelock sql.NullInt32 + Policy1FeePpm sql.NullInt64 + Policy1BaseFeeMsat sql.NullInt64 + Policy1MinHtlcMsat sql.NullInt64 + Policy1MaxHtlcMsat sql.NullInt64 + Policy1LastUpdate sql.NullInt64 + Policy1Disabled sql.NullBool + Policy1InboundBaseFeeMsat sql.NullInt64 + Policy1InboundFeeRateMilliMsat sql.NullInt64 + Policy1MessageFlags sql.NullInt16 + Policy1ChannelFlags sql.NullInt16 + Policy1BlockHeight sql.NullInt64 + Policy1DisableFlags sql.NullInt16 + Policy1Signature []byte + Policy2ID sql.NullInt64 + Policy2NodeID sql.NullInt64 + Policy2Version sql.NullInt16 + Policy2Timelock sql.NullInt32 + Policy2FeePpm sql.NullInt64 + Policy2BaseFeeMsat sql.NullInt64 + Policy2MinHtlcMsat sql.NullInt64 + Policy2MaxHtlcMsat sql.NullInt64 + Policy2LastUpdate sql.NullInt64 + Policy2Disabled sql.NullBool + Policy2InboundBaseFeeMsat sql.NullInt64 + Policy2InboundFeeRateMilliMsat sql.NullInt64 + Policy2MessageFlags sql.NullInt16 + Policy2ChannelFlags sql.NullInt16 + Policy2Signature []byte + Policy2BlockHeight sql.NullInt64 + Policy2DisableFlags sql.NullInt16 } -func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) { - rows, err := q.db.QueryContext(ctx, listNodesPaginated, arg.Version, arg.ID, arg.Limit) +func (q *Queries) ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listPreferredChannelsPaginated, arg.Scid, arg.Limit) if err != nil { return nil, err } defer rows.Close() - var items []GraphNode + var items []ListPreferredChannelsPaginatedRow for rows.Next() { - var i GraphNode + var i ListPreferredChannelsPaginatedRow if err := rows.Scan( - &i.ID, - &i.Version, - &i.PubKey, - &i.Alias, - &i.LastUpdate, - &i.Color, - &i.Signature, - &i.BlockHeight, + &i.GraphChannel.ID, + &i.GraphChannel.Version, + &i.GraphChannel.Scid, + &i.GraphChannel.NodeID1, + &i.GraphChannel.NodeID2, + &i.GraphChannel.Outpoint, + &i.GraphChannel.Capacity, + &i.GraphChannel.BitcoinKey1, + &i.GraphChannel.BitcoinKey2, + &i.GraphChannel.Node1Signature, + &i.GraphChannel.Node2Signature, + &i.GraphChannel.Bitcoin1Signature, + &i.GraphChannel.Bitcoin2Signature, + &i.GraphChannel.Signature, + &i.GraphChannel.FundingPkScript, + &i.GraphChannel.MerkleRootHash, + &i.Node1Pubkey, + &i.Node2Pubkey, + &i.Policy1ID, + &i.Policy1NodeID, + &i.Policy1Version, + &i.Policy1Timelock, + &i.Policy1FeePpm, + &i.Policy1BaseFeeMsat, + &i.Policy1MinHtlcMsat, + &i.Policy1MaxHtlcMsat, + &i.Policy1LastUpdate, + &i.Policy1Disabled, + &i.Policy1InboundBaseFeeMsat, + &i.Policy1InboundFeeRateMilliMsat, + &i.Policy1MessageFlags, + &i.Policy1ChannelFlags, + &i.Policy1BlockHeight, + &i.Policy1DisableFlags, + &i.Policy1Signature, + &i.Policy2ID, + &i.Policy2NodeID, + &i.Policy2Version, + &i.Policy2Timelock, + &i.Policy2FeePpm, + &i.Policy2BaseFeeMsat, + &i.Policy2MinHtlcMsat, + &i.Policy2MaxHtlcMsat, + &i.Policy2LastUpdate, + &i.Policy2Disabled, + &i.Policy2InboundBaseFeeMsat, + &i.Policy2InboundFeeRateMilliMsat, + &i.Policy2MessageFlags, + &i.Policy2ChannelFlags, + &i.Policy2Signature, + &i.Policy2BlockHeight, + &i.Policy2DisableFlags, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listPreferredNodesPaginated = `-- name: ListPreferredNodesPaginated :many +SELECT n.id, n.version, n.pub_key, n.alias, n.last_update, n.color, n.signature, n.block_height +FROM graph_preferred_nodes pn +JOIN graph_nodes n ON n.id = pn.node_id +WHERE pn.pub_key > $1 +ORDER BY pn.pub_key +LIMIT $2 +` + +type ListPreferredNodesPaginatedParams struct { + PubKey []byte + Limit int32 +} + +type ListPreferredNodesPaginatedRow struct { + GraphNode GraphNode +} + +func (q *Queries) ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listPreferredNodesPaginated, arg.PubKey, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListPreferredNodesPaginatedRow + for rows.Next() { + var i ListPreferredNodesPaginatedRow + if err := rows.Scan( + &i.GraphNode.ID, + &i.GraphNode.Version, + &i.GraphNode.PubKey, + &i.GraphNode.Alias, + &i.GraphNode.LastUpdate, + &i.GraphNode.Color, + &i.GraphNode.Signature, + &i.GraphNode.BlockHeight, ); err != nil { return nil, err } @@ -4497,6 +4681,57 @@ func (q *Queries) UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTy return err } +const upsertPreferredChannel = `-- name: UpsertPreferredChannel :exec +/* ───────────────────────────────────────────── + graph_preferred_channels table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT c.scid, c.id +FROM graph_channels c +WHERE c.scid = $1 +ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id AND p.version = c.version + ) DESC, + c.version DESC +LIMIT 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id +` + +// Recompute the preferred channel for a given SCID and upsert the result. +// Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. +func (q *Queries) UpsertPreferredChannel(ctx context.Context, scid []byte) error { + _, err := q.db.ExecContext(ctx, upsertPreferredChannel, scid) + return err +} + +const upsertPreferredNode = `-- name: UpsertPreferredNode :exec +/* ───────────────────────────────────────────── + graph_preferred_nodes table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT n.pub_key, n.id +FROM graph_nodes n +WHERE n.pub_key = $1 +ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC +LIMIT 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id +` + +// Recompute the preferred node for a given pub_key and upsert the result. +// Priority: v2 announced > v1 announced > v2 shell > v1 shell. +func (q *Queries) UpsertPreferredNode(ctx context.Context, pubKey []byte) error { + _, err := q.db.ExecContext(ctx, upsertPreferredNode, pubKey) + return err +} + const upsertPruneLogEntry = `-- name: UpsertPruneLogEntry :exec /* ───────────────────────────���───────────────── graph_prune_log table queries diff --git a/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql new file mode 100644 index 00000000000..2cfc0cd5af6 --- /dev/null +++ b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS graph_preferred_channels; +DROP TABLE IF EXISTS graph_preferred_nodes; diff --git a/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql new file mode 100644 index 00000000000..547e4971e2c --- /dev/null +++ b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql @@ -0,0 +1,83 @@ +-- Preferred-node mapping: one row per unique pub_key pointing at the "best" +-- node row across gossip versions. Priority: v2 announced > v1 announced > +-- v2 shell > v1 shell. +CREATE TABLE IF NOT EXISTS graph_preferred_nodes ( + pub_key BLOB PRIMARY KEY, + node_id BIGINT NOT NULL REFERENCES graph_nodes(id) ON DELETE CASCADE +); + +-- Index on node_id so cascade deletes from graph_nodes can locate the +-- referencing rows without a sequential scan. +CREATE INDEX IF NOT EXISTS graph_preferred_nodes_node_id_idx + ON graph_preferred_nodes (node_id); + +-- Preferred-channel mapping: one row per unique SCID pointing at the "best" +-- channel row across gossip versions. Priority: v2 with policies > +-- v1 with policies > v2 bare > v1 bare. +CREATE TABLE IF NOT EXISTS graph_preferred_channels ( + scid BLOB PRIMARY KEY, + channel_id BIGINT NOT NULL REFERENCES graph_channels(id) ON DELETE CASCADE +); + +-- Index on channel_id so cascade deletes from graph_channels can locate +-- the referencing rows without a sequential scan. +CREATE INDEX IF NOT EXISTS graph_preferred_channels_channel_id_idx + ON graph_preferred_channels (channel_id); + +-- Populate graph_preferred_nodes from the graph_nodes rows that already +-- existed before this migration. The inner query ranks every node row within +-- each pub_key group. Announced nodes, identified by a non-empty signature, +-- outrank shell nodes, and higher gossip versions win within the same +-- announced/shell class. The outer INSERT keeps only rn = 1, leaving exactly +-- one preferred node_id per pub_key. +-- +-- The conflict clause makes this population step idempotent if the migration +-- is retried after the tables were created and partially populated. +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT sub.pub_key, sub.node_id +FROM ( + SELECT + n.pub_key, + n.id AS node_id, + ROW_NUMBER() OVER ( + PARTITION BY n.pub_key + ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC + ) AS rn + FROM graph_nodes n +) sub +WHERE sub.rn = 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id +WHERE graph_preferred_nodes.node_id <> EXCLUDED.node_id; + +-- Populate graph_preferred_channels from the graph_channels rows that already +-- existed before this migration. The inner query ranks every channel row +-- within each SCID group. A channel version with at least one policy row +-- outranks a bare channel version, and higher gossip versions win within the +-- same policy/bare class. The outer INSERT keeps only rn = 1, leaving exactly +-- one preferred channel_id per SCID. +-- +-- The conflict clause makes this population step idempotent if the migration +-- is retried after the tables were created and partially populated. +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT sub.scid, sub.channel_id +FROM ( + SELECT + c.scid, + c.id AS channel_id, + ROW_NUMBER() OVER ( + PARTITION BY c.scid + ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id + AND p.version = c.version + ) DESC, + c.version DESC + ) AS rn + FROM graph_channels c +) sub +WHERE sub.rn = 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id +WHERE graph_preferred_channels.channel_id <> EXCLUDED.channel_id; diff --git a/sqldb/sqlc/models.go b/sqldb/sqlc/models.go index ef9aa9006f9..6df9fae7e35 100644 --- a/sqldb/sqlc/models.go +++ b/sqldb/sqlc/models.go @@ -123,6 +123,16 @@ type GraphNodeFeature struct { FeatureBit int32 } +type GraphPreferredChannel struct { + Scid []byte + ChannelID int64 +} + +type GraphPreferredNode struct { + PubKey []byte + NodeID int64 +} + type GraphPruneLog struct { BlockHeight int64 BlockHash []byte diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 9b95a669917..a49cb6d6b18 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -230,7 +230,8 @@ type Querier interface { ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg ListChannelsWithPoliciesForCachePaginatedParams) ([]ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error) - ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) + ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) + ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) NextInvoiceSettleIndex(ctx context.Context) (int64, error) NodeExists(ctx context.Context, arg NodeExistsParams) (bool, error) OnAMPSubInvoiceCanceled(ctx context.Context, arg OnAMPSubInvoiceCanceledParams) error @@ -255,6 +256,12 @@ type Querier interface { UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, error) UpsertNodeAddress(ctx context.Context, arg UpsertNodeAddressParams) error UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTypeParams) error + // Recompute the preferred channel for a given SCID and upsert the result. + // Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. + UpsertPreferredChannel(ctx context.Context, scid []byte) error + // Recompute the preferred node for a given pub_key and upsert the result. + // Priority: v2 announced > v1 announced > v2 shell > v1 shell. + UpsertPreferredNode(ctx context.Context, pubKey []byte) error UpsertPruneLogEntry(ctx context.Context, arg UpsertPruneLogEntryParams) error // We use a separate upsert for our own node since we want to be less strict // about the last_update field. For our own node, we always want to diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index a7683d1fbd9..9d55e2fecde 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -73,12 +73,13 @@ FROM graph_nodes WHERE pub_key = $1 AND version = $2; --- name: ListNodesPaginated :many -SELECT * -FROM graph_nodes -WHERE version = $1 AND id > $2 -ORDER BY id -LIMIT $3; +-- name: ListPreferredNodesPaginated :many +SELECT sqlc.embed(n) +FROM graph_preferred_nodes pn +JOIN graph_nodes n ON n.id = pn.node_id +WHERE pn.pub_key > $1 +ORDER BY pn.pub_key +LIMIT $2; -- name: ListNodeIDsAndPubKeys :many SELECT id, pub_key @@ -978,6 +979,64 @@ WHERE c.version = $1 AND c.id > $2 ORDER BY c.id LIMIT $3; +-- name: ListPreferredChannelsPaginated :many +SELECT + sqlc.embed(c), + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.block_height AS policy1_block_height, + cp1.disable_flags AS policy1_disable_flags, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy_2_signature, + cp2.block_height AS policy_2_block_height, + cp2.disable_flags AS policy_2_disable_flags + +FROM graph_preferred_channels pc +JOIN graph_channels c ON c.id = pc.channel_id +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE pc.scid > $1 +ORDER BY pc.scid +LIMIT $2; + -- name: ListChannelsWithPoliciesForCachePaginated :many SELECT c.id as id, @@ -1435,3 +1494,42 @@ ON CONFLICT (channel_id, node_id, version) channel_flags = EXCLUDED.channel_flags, signature = EXCLUDED.signature RETURNING id; + +/* ───────────────────────────────────────────── + graph_preferred_nodes table queries + ───────────────────────────────────────────── +*/ + +-- name: UpsertPreferredNode :exec +-- Recompute the preferred node for a given pub_key and upsert the result. +-- Priority: v2 announced > v1 announced > v2 shell > v1 shell. +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT n.pub_key, n.id +FROM graph_nodes n +WHERE n.pub_key = $1 +ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC +LIMIT 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id; + +/* ───────────────────────────────────────────── + graph_preferred_channels table queries + ───────────────────────────────────────────── +*/ + +-- name: UpsertPreferredChannel :exec +-- Recompute the preferred channel for a given SCID and upsert the result. +-- Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT c.scid, c.id +FROM graph_channels c +WHERE c.scid = $1 +ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id AND p.version = c.version + ) DESC, + c.version DESC +LIMIT 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id;