Skip to content

Commit bc99e3e

Browse files
authored
Merge pull request #176 from asoberai/dv2
[fw] initial BIER support
2 parents 7ac25fb + 7f17d5c commit bc99e3e

38 files changed

Lines changed: 4525 additions & 160 deletions

cmd/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func init() {
5151
CmdNDNd.AddCommand(tools.CmdPingServer())
5252
CmdNDNd.AddCommand(tools.CmdCatChunks())
5353
CmdNDNd.AddCommand(tools.CmdPutChunks())
54+
CmdNDNd.AddCommand(tools.CmdSvsChat())
5455
}
5556

5657
// (AI GENERATED DESCRIPTION): Creates the top‑level “fw” command for managing the NDN Forwarding Daemon, adding a “run” subcommand to start the daemon and a set of “nfdc” control subcommands for configuring it.

cmd/svs-chat/main.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"os"
7+
"time"
8+
9+
enc "github.com/named-data/ndnd/std/encoding"
10+
eng "github.com/named-data/ndnd/std/engine"
11+
"github.com/named-data/ndnd/std/log"
12+
"github.com/named-data/ndnd/std/ndn"
13+
"github.com/named-data/ndnd/std/object"
14+
"github.com/named-data/ndnd/std/object/storage"
15+
"github.com/named-data/ndnd/std/sync"
16+
)
17+
18+
func main() {
19+
log.Default().SetLevel(log.LevelTrace)
20+
prefixStr := flag.String("prefix", "/minindn/svs", "SVS group prefix")
21+
nameStr := flag.String("name", "", "Participant name")
22+
msgStr := flag.String("msg", "", "Message to send")
23+
delaySecs := flag.Int("delay", 0, "Wait N seconds before sending the message")
24+
waitSecs := flag.Int("wait", 10, "Seconds to wait before exiting")
25+
flag.Parse()
26+
27+
if *nameStr == "" {
28+
fmt.Println("Error: -name is required")
29+
os.Exit(1)
30+
}
31+
32+
groupPrefix, _ := enc.NameFromStr(*prefixStr)
33+
nodeName, _ := enc.NameFromStr(*nameStr)
34+
35+
app := eng.NewBasicEngine(eng.NewDefaultFace())
36+
if err := app.Start(); err != nil {
37+
fmt.Printf("Failed to start engine: %v\n", err)
38+
os.Exit(1)
39+
}
40+
defer app.Stop()
41+
42+
store := storage.NewMemoryStore()
43+
client := object.NewClient(app, store, nil)
44+
45+
err := client.Start()
46+
if err != nil {
47+
panic(err)
48+
}
49+
defer client.Stop()
50+
51+
syncPrefix := groupPrefix.Append(enc.NewKeywordComponent("svs"))
52+
client.AnnouncePrefix(ndn.Announcement{Name: syncPrefix, Expose: true, Multicast: true})
53+
54+
dataPrefix := groupPrefix.Append(nodeName...)
55+
client.AnnouncePrefix(ndn.Announcement{Name: dataPrefix, Expose: true})
56+
57+
alo, err := sync.NewSvsALO(sync.SvsAloOpts{
58+
Name: nodeName,
59+
Svs: sync.SvSyncOpts{
60+
Client: client,
61+
GroupPrefix: groupPrefix,
62+
BootTime: 1,
63+
},
64+
Snapshot: &sync.SnapshotNodeLatest{
65+
Client: client,
66+
SnapMe: func(_ enc.Name) (enc.Wire, error) {
67+
if *msgStr != "" {
68+
return enc.Wire{[]byte(*msgStr)}, nil
69+
}
70+
return enc.Wire{[]byte("(no message)")}, nil
71+
},
72+
Threshold: 5,
73+
},
74+
})
75+
if err != nil {
76+
panic(err)
77+
}
78+
79+
alo.SetOnPublisher(func(name enc.Name) {
80+
alo.SubscribePublisher(name, func(pub sync.SvsPub) {
81+
fmt.Printf("CHAT %s: %s\n", pub.Publisher.String(), string(pub.Bytes()))
82+
})
83+
})
84+
85+
if err := alo.Start(); err != nil {
86+
panic(err)
87+
}
88+
89+
// Important: let the sync initialization happen
90+
time.Sleep(2 * time.Second)
91+
92+
if *msgStr != "" {
93+
if *delaySecs > 0 {
94+
time.Sleep(time.Duration(*delaySecs) * time.Second)
95+
}
96+
_, _, err := alo.Publish(enc.Wire{[]byte(*msgStr)})
97+
if err != nil {
98+
fmt.Printf("Publish error: %v\n", err)
99+
} else {
100+
fmt.Printf("Published message: %s\n", *msgStr)
101+
}
102+
}
103+
104+
time.Sleep(time.Duration(*waitSecs) * time.Second)
105+
alo.Stop()
106+
}

dv/dv/insertion.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (pfx *PrefixModule) onPrefixInsertionObject(object ndn.Data, faceId uint64)
213213
return resError
214214
}
215215

216-
pfx.Announce(prefix, faceId, cost, params.ValidityPeriod)
216+
pfx.Announce(prefix, faceId, cost, false, params.ValidityPeriod)
217217

218218
return &mgmt.ControlResponse{
219219
Val: &mgmt.ControlResponseVal{

dv/dv/mgmt.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,9 @@ func (dv *Router) mgmtOnPrefix(args ndn.InterestHandlerArgs) {
292292
responseParams.ExpirationPeriod = optional.Some(expires)
293293
}
294294

295+
multicast := params.Val.Flags.GetOr(0)&1 != 0
295296
dv.mutex.Lock()
296-
dv.pfx.Announce(name, faceID, cost, validity)
297+
dv.pfx.Announce(name, faceID, cost, multicast, validity)
297298
dv.mutex.Unlock()
298299

299300
res.Val.StatusCode = 200

dv/dv/prefix.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ type PrefixModule struct {
4545
}
4646

4747
type petEgressOp struct {
48-
add bool
49-
name enc.Name
50-
egress enc.Name
48+
add bool
49+
name enc.Name
50+
egress enc.Name
51+
multicast bool
5152
}
5253

5354
type petNextHopOp struct {
@@ -225,10 +226,14 @@ func (pfx *PrefixModule) Reset() {
225226

226227
// Announce adds or updates a local prefix in prefix egress state.
227228
// Use face=0 and cost=0 for route-only semantics.
228-
func (pfx *PrefixModule) Announce(name enc.Name, face uint64, cost uint64, validity *spec.ValidityPeriod) {
229+
func (pfx *PrefixModule) Announce(name enc.Name, face uint64, cost uint64, multicast bool, validity *spec.ValidityPeriod) {
230+
pfx.announce(name, face, cost, multicast, validity)
231+
}
232+
233+
func (pfx *PrefixModule) announce(name enc.Name, face uint64, cost uint64, multicast bool, validity *spec.ValidityPeriod) {
229234
pfx.mu.Lock()
230-
petOps := pfx.addRouterPrefixPet(pfx.routerName, name)
231-
pfx.pfx.Announce(name, face, cost, validity)
235+
petOps := pfx.addRouterPrefixPet(pfx.routerName, name, multicast)
236+
pfx.pfx.Announce(name, face, cost, multicast, validity)
232237
pfx.mu.Unlock()
233238

234239
pfx.applyPetOps(petOps)
@@ -330,7 +335,7 @@ func (pfx *PrefixModule) processUpdate(wire enc.Wire) (dirty bool, petOps []petE
330335
petOps = append(petOps, pfx.resetRouterPet(router)...)
331336
}
332337
for _, add := range ops.PrefixOpAdds {
333-
petOps = append(petOps, pfx.addRouterPrefixPet(router, add.Name)...)
338+
petOps = append(petOps, pfx.addRouterPrefixPet(router, add.Name, add.Multicast)...)
334339
}
335340
for _, remove := range ops.PrefixOpRemoves {
336341
petOps = append(petOps, pfx.removeRouterPrefixPet(router, remove.Name)...)
@@ -360,7 +365,7 @@ func (pfx *PrefixModule) resetRouterPet(router enc.Name) []petEgressOp {
360365
return ops
361366
}
362367

363-
func (pfx *PrefixModule) addRouterPrefixPet(router enc.Name, prefix enc.Name) []petEgressOp {
368+
func (pfx *PrefixModule) addRouterPrefixPet(router enc.Name, prefix enc.Name, multicast bool) []petEgressOp {
364369
routerHash := router.Hash()
365370
prefixes := pfx.petPrefixes[routerHash]
366371
if prefixes == nil {
@@ -376,9 +381,10 @@ func (pfx *PrefixModule) addRouterPrefixPet(router enc.Name, prefix enc.Name) []
376381

377382
egress := router.Clone()
378383
return []petEgressOp{{
379-
add: true,
380-
name: prefix.Clone(),
381-
egress: egress,
384+
add: true,
385+
name: prefix.Clone(),
386+
egress: egress,
387+
multicast: multicast,
382388
}}
383389
}
384390

@@ -413,17 +419,22 @@ func (pfx *PrefixModule) applyPetOps(ops []petEgressOp) {
413419

414420
for _, op := range ops {
415421
cmd := "remove-egress"
422+
args := &mgmt.ControlArgs{
423+
Name: op.name,
424+
Egress: &mgmt.EgressRecord{Name: op.egress},
425+
}
416426
if op.add {
417427
cmd = "add-egress"
428+
// Signal the Sync group (multicast) flag to the forwarder's PET via Flags bit 0.
429+
if op.multicast {
430+
args.Flags = optional.Some(uint64(1))
431+
}
418432
}
419433

420434
pfx.nfdc.Exec(nfdc.NfdMgmtCmd{
421-
Module: "pet",
422-
Cmd: cmd,
423-
Args: &mgmt.ControlArgs{
424-
Name: op.name,
425-
Egress: &mgmt.EgressRecord{Name: op.egress},
426-
},
435+
Module: "pet",
436+
Cmd: cmd,
437+
Args: args,
427438
Retries: -1,
428439
})
429440
}

dv/dv/table_algo.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dv
33
import (
44
"github.com/named-data/ndnd/dv/config"
55
"github.com/named-data/ndnd/dv/table"
6+
fw "github.com/named-data/ndnd/fw/fw"
67
enc "github.com/named-data/ndnd/std/encoding"
78
"github.com/named-data/ndnd/std/log"
89
)
@@ -134,4 +135,10 @@ func (dv *Router) updateFib() {
134135
}
135136
}
136137
dv.fib.RemoveUnmarked()
138+
139+
// Rebuild the BIFT whenever the FIB changes so BIER forwarding paths
140+
// are always consistent with the routing table.
141+
if fw.IsBierEnabled() {
142+
fw.Bift.BuildFromFib()
143+
}
137144
}

dv/table/prefix_state.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,21 @@ func (pt *PrefixEgreState) Reset() {
9494

9595
// Announce updates or creates a local prefix with an optional validity period.
9696
// Use face=0 and cost=0 for route-only semantics.
97-
func (pt *PrefixEgreState) Announce(name enc.Name, face uint64, cost uint64, validity *spec.ValidityPeriod) {
97+
// multicast=true marks this as a Sync group prefix (vs. a producer prefix).
98+
func (pt *PrefixEgreState) Announce(name enc.Name, face uint64, cost uint64, multicast bool, validity *spec.ValidityPeriod) {
9899
hash := name.TlvStr()
99100
entry := pt.me.Prefixes[hash]
100101
publishAdd := false
101102
if entry == nil {
102103
entry = &PrefixEntry{
103-
Name: name,
104+
Name: name,
105+
Multicast: multicast,
104106
}
105107
pt.me.Prefixes[hash] = entry
106108
publishAdd = true
109+
} else if multicast && !entry.Multicast {
110+
entry.Multicast = true
111+
publishAdd = true
107112
}
108113

109114
if !sameValidityPeriod(entry.ValidityPeriod, validity) {

e2e-run.sh

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/usr/bin/env bash
2+
# e2e-run.sh — Docker entrypoint: install binaries and run e2e tests.
3+
set -euo pipefail
4+
5+
log() { echo "==> $*"; }
6+
7+
# OpenVSwitch is required for Mininet virtual switches.
8+
# ovs-vsctl can hang on macOS linuxkit kernels, so both calls are guarded with timeout.
9+
log "Starting OpenVSwitch..."
10+
timeout 15 service openvswitch-switch start 2>/dev/null || true
11+
timeout 5 ovs-vsctl set-manager ptcp:6640 2>/dev/null || true
12+
13+
log "Installing binaries..."
14+
install -m 755 /ndnd/.bin/ndnd /usr/local/bin/ndnd
15+
log " ndnd: $(ndnd --version 2>&1 | head -1 || echo unknown)"
16+
17+
# Mini-NDN's NFD config template needs readvertise_nlsr enabled.
18+
sed -i 's/readvertise_nlsr no/readvertise_nlsr yes/g' \
19+
/usr/local/etc/ndn/nfd.conf.sample 2>/dev/null || true
20+
21+
# runner.py calls `go build` to ensure fresh binaries.
22+
# Pre-built binaries are already in .bin/ so we shim `go` to be a no-op.
23+
mkdir -p /usr/local/fake-go
24+
cat > /usr/local/fake-go/go << 'EOF'
25+
#!/bin/bash
26+
exit 0
27+
EOF
28+
chmod +x /usr/local/fake-go/go
29+
export PATH="/usr/local/fake-go:$PATH"
30+
31+
log "Running e2e tests..."
32+
cd /ndnd
33+
python3 e2e/runner.py e2e/topo.sprint.conf

e2e/dv_util.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,25 @@ def wait_prefix_pet_ready(node_to_prefixes: dict[Node, set[str]], deadline=30) -
8888
time.sleep(1)
8989

9090
raise Exception('PET prefix replication did not converge')
91+
92+
def populate_bift(nodes: list[Node], bier_map: dict, network=DEFAULT_NETWORK):
93+
info(f'Deploying BIER indices to BIFT ({len(bier_map)} routers)...\n')
94+
for node in nodes:
95+
for router, idx in bier_map.items():
96+
router_name = f'{network}/{router.name}'
97+
node.cmd(f'ndnd fw bift-register prefix="{router_name}" cost={idx}')
98+
node.cmd('ndnd fw bift-rebuild')
99+
info('BIFT populated on all nodes\n')
100+
101+
102+
def dump_bier_logs(nodes: list[Node], label: str = '', lines: int = 40) -> str:
103+
"""Return BIER-relevant log lines from each node's ndnd log (for failure diagnostics)."""
104+
out = f'\n=== BIER forwarder logs{" (" + label + ")" if label else ""} ===\n'
105+
for node in nodes:
106+
log = node.cmd(
107+
f'cat /tmp/minindn/{node.name}/log/yanfd.log'
108+
f' 2>/dev/null | grep -iE "bier|bift|bfir|bfr|bfer|strategy" | tail -{lines}'
109+
)
110+
if log.strip():
111+
out += f'--- {node.name} ---\n{log}\n'
112+
return out

e2e/fw.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from minindn.apps.application import Application
66

77
class NDNd_FW(Application):
8-
def __init__(self, node, config={}, logLevel='INFO', threads=2, network='/minindn'):
8+
def __init__(self, node, config={}, logLevel='INFO', threads=2, network='/minindn', bier_index=-1):
99
Application.__init__(self, node)
1010

1111
if not shutil.which('ndnd'):
@@ -37,6 +37,8 @@ def __init__(self, node, config={}, logLevel='INFO', threads=2, network='/minind
3737
'threads': threads,
3838
# Two-phase PET forwarding needs the local router identity.
3939
'router_name': f'{network}/{node.name}',
40+
# BIER bit index for this router (-1 = disabled)
41+
'bier_index': bier_index,
4042
},
4143
}
4244

0 commit comments

Comments
 (0)