Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/release-notes/release-notes-0.21.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,12 @@
fallback](https://github.com/lightningnetwork/lnd/pull/10717) so that gossip
channel filtering and zombie edge lookups use the correct gossip version
instead of hardcoding v1.
* Make the [graph `Store` interface
cross-version](https://github.com/lightningnetwork/lnd/pull/10714) so that
`ForEachNode`, `ForEachChannel`, and `ForEachNodeDirectedChannel` work across
gossip v1 and v2. Add `Preferred` fetch helpers and `GetVersions` queries
so callers can retrieve channels without knowing which gossip version
announced them.
* Updated waiting proof persistence for gossip upgrades by introducing typed
waiting proof keys and payloads, with a DB migration to rewrite legacy
waiting proof records to the new key/value format
Expand Down
20 changes: 9 additions & 11 deletions graph/db/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,7 @@ func TestPopulateDBs(t *testing.T) {
// graph.
countNodes := func(graph *ChannelGraph) int {
numNodes := 0
v1Graph := NewVersionedGraph(graph, lnwire.GossipVersion1)
err := v1Graph.ForEachNode(
err := graph.ForEachNode(
ctx,
func(node *models.Node) error {
numNodes++
Expand All @@ -372,7 +371,7 @@ func TestPopulateDBs(t *testing.T) {
numPolicies = 0
)
err := graph.ForEachChannel(
ctx, lnwire.GossipVersion1,
ctx,
func(info *models.ChannelEdgeInfo, policy,
policy2 *models.ChannelEdgePolicy) error {

Expand Down Expand Up @@ -500,7 +499,7 @@ func syncGraph(t *testing.T, src, dest *ChannelGraph) {
}

var wgChans sync.WaitGroup
err = src.ForEachChannel(ctx, lnwire.GossipVersion1,
err = src.ForEachChannel(ctx,
func(info *models.ChannelEdgeInfo,
policy1, policy2 *models.ChannelEdgePolicy) error {

Expand Down Expand Up @@ -624,7 +623,7 @@ func BenchmarkGraphReadMethods(b *testing.B) {
name: "ForEachNode",
fn: func(b testing.TB, store Store) {
err := store.ForEachNode(
ctx, lnwire.GossipVersion1,
ctx,
func(_ *models.Node) error {
// Increment the counter to
// ensure the callback is doing
Expand All @@ -640,12 +639,11 @@ func BenchmarkGraphReadMethods(b *testing.B) {
{
name: "ForEachChannel",
fn: func(b testing.TB, store Store) {
//nolint:ll
err := store.ForEachChannel(
ctx, lnwire.GossipVersion1,
err := store.ForEachChannel(ctx,
func(_ *models.ChannelEdgeInfo,
_,
_ *models.ChannelEdgePolicy,
_ *models.ChannelEdgePolicy) error {
) error {

// Increment the counter to
// ensure the callback is doing
Expand Down Expand Up @@ -996,7 +994,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) {
)

err := store.ForEachNode(
ctx, lnwire.GossipVersion1,
ctx,
func(_ *models.Node) error {
numNodes++

Expand All @@ -1007,7 +1005,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) {

//nolint:ll
err = store.ForEachChannel(
ctx, lnwire.GossipVersion1,
ctx,
func(_ *models.ChannelEdgeInfo,
_,
_ *models.ChannelEdgePolicy) error {
Expand Down
88 changes: 51 additions & 37 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,21 @@ func (c *ChannelGraph) populateCache(ctx context.Context) error {
for _, v := range []lnwire.GossipVersion{
gossipV1, gossipV2,
} {
// TODO(elle): If we have both v1 and v2 entries for the same
// node/channel, prefer v2 when merging.
// We iterate v1 first, then v2. AddNodeFeatures and AddChannel
// overwrite on key collision, so v2 data takes precedence when
// both versions exist. For features specifically we
// additionally skip empty v2 entries so they don't shadow a
// non-empty v1 feature set; this matches the no-cache
// FetchNodeFeatures fallback rule that a non-empty
// lower-version vector wins over an empty higher-version one.
err := c.db.ForEachNodeCacheable(ctx, v,
func(node route.Vertex,
features *lnwire.FeatureVector) error {

if v == gossipV2 && features.IsEmpty() {
return nil
}

cache.AddNodeFeatures(node, features)

return nil
Expand Down Expand Up @@ -299,9 +308,8 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(ctx context.Context,
return c.cache.graphCache.ForEachChannel(node, cb)
}

// TODO(elle): once the no-cache path needs to support
// pathfinding across gossip versions, this should iterate
// across all versions rather than defaulting to v1.
// The no-cache path only runs against the KV backend, which is
// v1-only.
return c.db.ForEachNodeDirectedChannel(
ctx, gossipV1, node, cb, reset,
)
Expand All @@ -320,7 +328,9 @@ func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context,
return c.cache.graphCache.GetFeatures(node), nil
}

return c.db.FetchNodeFeatures(ctx, lnwire.GossipVersion1, node)
// The no-cache path only runs against the KV backend, which is
// v1-only.
return c.db.FetchNodeFeatures(ctx, gossipV1, node)
}

// GraphSession will provide the call-back with access to a NodeTraverser
Expand Down Expand Up @@ -679,13 +689,22 @@ func (c *ChannelGraph) HasV1Node(ctx context.Context,
return c.db.HasV1Node(ctx, nodePub)
}

// ForEachChannel iterates through all channel edges stored within the graph.
// ForEachChannel iterates through all channel edges stored within the graph
// across all gossip versions.
func (c *ChannelGraph) ForEachChannel(ctx context.Context,
v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo,
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error,
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error,
reset func()) error {

return c.db.ForEachChannel(ctx, v, cb, reset)
return c.db.ForEachChannel(ctx, cb, reset)
}

// ForEachNode iterates through all stored vertices/nodes in the graph across
// all gossip versions.
func (c *ChannelGraph) ForEachNode(ctx context.Context,
cb func(*models.Node) error, reset func()) error {

return c.db.ForEachNode(ctx, cb, reset)
}

// DisabledChannelIDs returns the channel ids of disabled channels.
Expand Down Expand Up @@ -817,26 +836,39 @@ func (c *ChannelGraph) FetchChanInfos(ctx context.Context,
}

// FetchChannelEdgesByOutpoint attempts to lookup directed edges by funding
// outpoint.
// outpoint, returning the highest available gossip version.
func (c *ChannelGraph) FetchChannelEdgesByOutpoint(ctx context.Context,
op *wire.OutPoint) (
*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy, error) {

return c.db.FetchChannelEdgesByOutpoint(
ctx, lnwire.GossipVersion1, op,
)
return c.db.FetchChannelEdgesByOutpointPreferred(ctx, op)
}

// FetchChannelEdgesByID attempts to lookup directed edges by channel ID.
// FetchChannelEdgesByID attempts to lookup directed edges by channel ID,
// returning the highest available gossip version.
func (c *ChannelGraph) FetchChannelEdgesByID(ctx context.Context,
chanID uint64) (
*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy, error) {

return c.db.FetchChannelEdgesByID(
ctx, lnwire.GossipVersion1, chanID,
)
return c.db.FetchChannelEdgesByIDPreferred(ctx, chanID)
}

// GetVersionsBySCID returns the list of gossip versions for which a channel
// with the given SCID exists in the database.
func (c *ChannelGraph) GetVersionsBySCID(ctx context.Context,
chanID uint64) ([]lnwire.GossipVersion, error) {

return c.db.GetVersionsBySCID(ctx, chanID)
}

// GetVersionsByOutpoint returns the list of gossip versions for which a channel
// with the given funding outpoint exists in the database.
func (c *ChannelGraph) GetVersionsByOutpoint(ctx context.Context,
op *wire.OutPoint) ([]lnwire.GossipVersion, error) {

return c.db.GetVersionsByOutpoint(ctx, op)
}

// PutClosedScid stores a SCID for a closed channel in the database.
Expand Down Expand Up @@ -927,13 +959,6 @@ func (c *VersionedGraph) ForEachNodeCached(ctx context.Context,
return c.ChannelGraph.ForEachNodeCached(ctx, c.v, withAddrs, cb, reset)
}

// ForEachNode iterates through all stored vertices/nodes in the graph.
func (c *VersionedGraph) ForEachNode(ctx context.Context,
cb func(*models.Node) error, reset func()) error {

return c.db.ForEachNode(ctx, c.v, cb, reset)
}

// NumZombies returns the current number of zombie channels in the graph.
func (c *VersionedGraph) NumZombies(ctx context.Context) (uint64, error) {
return c.db.NumZombies(ctx, c.v)
Expand Down Expand Up @@ -969,17 +994,14 @@ func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint,
// for performing queries against the channel graph. If the graph cache is
// enabled, the callback receives the VersionedGraph directly (which implements
// NodeTraverser using the cache). Otherwise a read-only database session is
// used.
// used; the no-cache path only runs against the KV backend, which is v1-only.
func (c *VersionedGraph) GraphSession(ctx context.Context,
cb func(graph NodeTraverser) error, reset func()) error {

if c.cache != nil && c.cache.isLoaded() {
return cb(c)
}

// TODO(elle): the underlying GraphSession currently creates a
// NodeTraverser that is hardcoded to GossipVersion1. This needs to be
// updated to pass the version through for v2 support.
return c.db.GraphSession(ctx, cb, reset)
}

Expand Down Expand Up @@ -1107,14 +1129,6 @@ func (c *VersionedGraph) ForEachNodeChannel(ctx context.Context,
return c.db.ForEachNodeChannel(ctx, c.v, nodePub, cb, reset)
}

// ForEachChannel iterates through all channel edges stored within the graph.
func (c *VersionedGraph) ForEachChannel(ctx context.Context,
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error, reset func()) error {

return c.db.ForEachChannel(ctx, c.v, cb, reset)
}

// ForEachNodeCacheable iterates through all stored vertices/nodes in the graph.
func (c *VersionedGraph) ForEachNodeCacheable(ctx context.Context,
cb func(route.Vertex, *lnwire.FeatureVector) error,
Expand Down
Loading
Loading