Skip to content

Commit 76aeb89

Browse files
authored
Merge pull request #179 from r2dev2/r2dev2/three-pipeline-forwarding
[fw] refactor incoming Interest processing
2 parents bc99e3e + 799fcca commit 76aeb89

38 files changed

Lines changed: 1199 additions & 1057 deletions

.github/workflows/test.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,10 @@ jobs:
8989
9090
- name: Run e2e tests
9191
run: make e2e
92+
93+
- name: Upload minindn logs on failure
94+
if: failure()
95+
uses: actions/upload-artifact@v4
96+
with:
97+
name: minindn-logs
98+
path: /tmp/minindn

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ ndnd: clean
99
CGO_ENABLED=0 go build -o ndnd \
1010
-ldflags "-X '${STD_PACKAGE}/utils.NDNdVersion=${VERSION}'" \
1111
cmd/ndnd/main.go
12+
mkdir -p .bin/
13+
cp ndnd .bin/
1214

1315
examples: .bin/alo-latest
1416

dv/config/config.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@ var BroadcastStrategy = enc.LOCALHOST.
3232
Append(enc.NewGenericComponent("strategy")).
3333
Append(enc.NewGenericComponent("broadcast"))
3434

35-
var ReplicastStrategy = enc.LOCALHOST.
36-
Append(enc.NewGenericComponent("nfd")).
37-
Append(enc.NewGenericComponent("strategy")).
38-
Append(enc.NewGenericComponent("replicast"))
39-
4035
//go:embed schema.tlv
4136
var SchemaBytes []byte
4237

dv/dv/mgmt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ func (dv *Router) mgmtOnPrefix(args ndn.InterestHandlerArgs) {
292292
responseParams.ExpirationPeriod = optional.Some(expires)
293293
}
294294

295-
multicast := params.Val.Flags.GetOr(0)&1 != 0
295+
multicast := params.Val.Multicast
296296
dv.mutex.Lock()
297297
dv.pfx.Announce(name, faceID, cost, multicast, validity)
298298
dv.mutex.Unlock()

dv/dv/prefix.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,9 +425,8 @@ func (pfx *PrefixModule) applyPetOps(ops []petEgressOp) {
425425
}
426426
if op.add {
427427
cmd = "add-egress"
428-
// Signal the Sync group (multicast) flag to the forwarder's PET via Flags bit 0.
429428
if op.multicast {
430-
args.Flags = optional.Some(uint64(1))
429+
args.Multicast = true
431430
}
432431
}
433432

dv/dv/router.go

Lines changed: 39 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/named-data/ndnd/dv/config"
99
"github.com/named-data/ndnd/dv/nfdc"
1010
"github.com/named-data/ndnd/dv/table"
11+
"github.com/named-data/ndnd/fw/defn"
1112
enc "github.com/named-data/ndnd/std/encoding"
1213
"github.com/named-data/ndnd/std/log"
1314
"github.com/named-data/ndnd/std/ndn"
@@ -285,64 +286,51 @@ func (dv *Router) register() (err error) {
285286
}
286287

287288
for _, prefix := range pfxs {
288-
dv.nfdc.Exec(nfdc.NfdMgmtCmd{
289-
Module: "pet",
290-
Cmd: "add-nexthop",
291-
Args: &mgmt.ControlArgs{
292-
Name: prefix,
293-
},
294-
Retries: -1,
289+
dv.execMgmtRetry("pet", "add-nexthop", &mgmt.ControlArgs{
290+
Name: prefix,
295291
})
296292
}
297293
// Allow outgoing local-prefix-sync Interests to use two-phase forwarding.
298294
// Incoming Interests still terminate locally on the same prefix.
299-
dv.nfdc.Exec(nfdc.NfdMgmtCmd{
300-
Module: "pet",
301-
Cmd: "add-egress",
302-
Args: &mgmt.ControlArgs{
303-
Name: dv.pfx.SyncPrefix(),
304-
Egress: &mgmt.EgressRecord{Name: neighborsPrefix.Clone()},
305-
},
306-
Retries: -1,
295+
dv.execMgmtRetry("pet", "add-egress", &mgmt.ControlArgs{
296+
Name: dv.pfx.SyncPrefix(),
297+
Egress: &mgmt.EgressRecord{Name: neighborsPrefix.Clone()},
298+
Multicast: true,
307299
})
308300
// Set Advertisement Sync to localhop neighbors
309-
dv.nfdc.Exec(nfdc.NfdMgmtCmd{
310-
Module: "pet",
311-
Cmd: "add-egress",
312-
Args: &mgmt.ControlArgs{
313-
Name: dv.config.AdvertisementSyncPrefix(),
314-
Egress: &mgmt.EgressRecord{Name: neighborsPrefix.Clone()},
315-
},
316-
Retries: -1,
317-
})
318-
// Set strategy to broadcast for advertisement sync Interests, so
319-
// /localhop/.../DV/ADS traffic fan-outs to all neighbor nexthops.
320-
dv.nfdc.Exec(nfdc.NfdMgmtCmd{
321-
Module: "strategy-choice",
322-
Cmd: "set",
323-
Args: &mgmt.ControlArgs{
324-
Name: dv.config.AdvertisementSyncPrefix(),
325-
Strategy: &mgmt.Strategy{
326-
Name: config.BroadcastStrategy,
327-
},
328-
},
329-
Retries: -1,
330-
})
331-
dv.nfdc.Exec(nfdc.NfdMgmtCmd{
332-
Module: "strategy-choice",
333-
Cmd: "set",
334-
Args: &mgmt.ControlArgs{
335-
Name: dv.pfx.SyncPrefix(),
336-
Strategy: &mgmt.Strategy{
337-
Name: config.BroadcastStrategy,
338-
},
339-
},
340-
Retries: -1,
301+
dv.execMgmtRetry("pet", "add-egress", &mgmt.ControlArgs{
302+
Name: dv.config.AdvertisementSyncPrefix(),
303+
Egress: &mgmt.EgressRecord{Name: neighborsPrefix.Clone()},
304+
Multicast: true,
341305
})
342306

307+
// Force multicast strategy for sync prefixes to broadcast.
308+
broadcastPrefixes := []enc.Name{
309+
dv.pfx.SyncPrefix(),
310+
dv.config.AdvertisementSyncPrefix(),
311+
}
312+
for _, prefix := range broadcastPrefixes {
313+
dv.execMgmtRetry("multicast-strategy-choice", "set", &mgmt.ControlArgs{
314+
Name: prefix,
315+
Strategy: &mgmt.Strategy{Name: defn.BROADCAST_STRATEGY},
316+
})
317+
}
318+
343319
return nil
344320
}
345321

322+
func (dv *Router) execMgmtRetry(module, cmd string, args *mgmt.ControlArgs) {
323+
var err error
324+
for i := 0; ; i++ {
325+
if _, err = dv.engine.ExecMgmtCmd(module, cmd, args); err == nil {
326+
break
327+
}
328+
log.Error(dv, "Forwarder command failed", "err", err, "attempt", i,
329+
"module", module, "cmd", cmd, "args", args)
330+
time.Sleep(100 * time.Millisecond)
331+
}
332+
}
333+
346334
// createFaces creates faces to all neighbors.
347335
func (dv *Router) createFaces() {
348336
neighborsPrefix := enc.LOCALHOP.Append(enc.NewGenericComponent("neighbors"))
@@ -370,15 +358,10 @@ func (dv *Router) createFaces() {
370358
dv.mutex.Unlock()
371359

372360
// Add neighbor to localhop neighbors
373-
dv.nfdc.Exec(nfdc.NfdMgmtCmd{
374-
Module: "fib",
375-
Cmd: "add-nexthop",
376-
Args: &mgmt.ControlArgs{
377-
Name: neighborsPrefix.Clone(),
378-
Cost: optional.Some(uint64(1)),
379-
FaceId: optional.Some(faceId),
380-
},
381-
Retries: 3,
361+
dv.execMgmtRetry("fib", "add-nexthop", &mgmt.ControlArgs{
362+
Name: neighborsPrefix.Clone(),
363+
Cost: optional.Some(uint64(1)),
364+
FaceId: optional.Some(faceId),
382365
})
383366
}
384367
}

dv/dv/table_algo.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package dv
33
import (
44
"github.com/named-data/ndnd/dv/config"
55
"github.com/named-data/ndnd/dv/table"
6-
fw "github.com/named-data/ndnd/fw/fw"
6+
"github.com/named-data/ndnd/fw/bier"
77
enc "github.com/named-data/ndnd/std/encoding"
88
"github.com/named-data/ndnd/std/log"
99
)
@@ -138,7 +138,7 @@ func (dv *Router) updateFib() {
138138

139139
// Rebuild the BIFT whenever the FIB changes so BIER forwarding paths
140140
// are always consistent with the routing table.
141-
if fw.IsBierEnabled() {
142-
fw.Bift.BuildFromFib()
141+
if bier.IsBierEnabled() {
142+
bier.Bift.BuildFromFib()
143143
}
144144
}

e2e/dv_util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def is_converged(nodes: list[Node], network=DEFAULT_NETWORK, use_nfdc=False) ->
5353
# We don't support that.
5454
routes = node.cmd('nfdc route list')
5555
else:
56-
routes = node.cmd('ndnd fw route-list')
56+
routes = node.cmd('ndnd fw route-list 2>&1')
5757
for other in nodes:
5858
if other == node:
5959
continue

e2e/test_001.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,6 @@ def scenario(ndn: Minindn, network='/minindn'):
6363
strategy = node.cmd('ndnd fw strategy-list')
6464
if "multicast" in strategy:
6565
raise Exception(f'Multicast is to be retired, unexpectedly present in strategy on {node.name}')
66-
expected_strategies = [
67-
# Localhop SVS sync interests should use broadcast (#174)
68-
"prefix=/minindn/32=DV/32=PES/32=svs strategy=/localhost/nfd/strategy/broadcast/v=1",
69-
70-
# Localhop advertisement sync interests should use broadcast strategy (#174)
71-
"prefix=/localhop/minindn/32=DV/32=ADS strategy=/localhost/nfd/strategy/broadcast/v=1",
72-
]
73-
for expected_strat in expected_strategies:
74-
if expected_strat not in strategy:
75-
raise Exception(f'Strategy {expected_strat!r} not in strategy on {node.name}')
7666

7767
for node, put_node in cat_requests:
7868
cmd = f'ndnd cat "{network}/{put_node.name}/test" > recv.test.bin 2> cat.log'

e2e/test_007.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def scenario(ndn: Minindn, network='/minindn'):
8181
producer_prefixes = {sync_prefix, f'{network}/svs/{producer.name}'}
8282
dv_util.wait_prefix_pet_ready({node: producer_prefixes for node in consumers}, deadline=60)
8383

84+
beg = time.time()
8485
# Step 5: Wait for publish → BIER sync interest propagation → unicast data fetch.
8586
# 20s delay + 30s SVS sync cycle + 20s fetch buffer = 70s total.
8687
info(f'Waiting for message propagation from {producer.name} (70s)...\n')
@@ -90,6 +91,7 @@ def scenario(ndn: Minindn, network='/minindn'):
9091
for node in chat_nodes:
9192
node.cmd("pkill -f 'ndnd svs-chat' 2>/dev/null; true")
9293
time.sleep(1) # let log buffers flush
94+
print(time.time() - beg, "seconds elapsed")
9395

9496
failures = []
9597
for consumer in consumers:

0 commit comments

Comments
 (0)