Skip to content

Commit f53887f

Browse files
authored
feat: add binary codec with backward-compatible migration (#5419)
1 parent 7c3ec31 commit f53887f

6 files changed

Lines changed: 336 additions & 43 deletions

File tree

pkg/accounting/accounting.go

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"sync"
1717
"time"
1818

19+
"github.com/ethersphere/bee/v2/pkg/bigint"
1920
"github.com/ethersphere/bee/v2/pkg/log"
2021
"github.com/ethersphere/bee/v2/pkg/p2p"
2122
"github.com/ethersphere/bee/v2/pkg/pricing"
@@ -353,7 +354,7 @@ func (c *creditAction) Apply() error {
353354

354355
loggerV2.Debug("credit action apply", "crediting_peer_address", c.peer, "price", c.price, "new_balance", nextBalance)
355356

356-
err = c.accounting.store.Put(peerBalanceKey(c.peer), nextBalance)
357+
err = c.accounting.store.Put(peerBalanceKey(c.peer), &bigint.BigInt{Int: nextBalance})
357358
if err != nil {
358359
return fmt.Errorf("failed to persist balance: %w", err)
359360
}
@@ -406,7 +407,7 @@ func (c *creditAction) Apply() error {
406407
loggerV2.Debug("credit action apply; decreasing originated balance", "crediting_peer_address", c.peer, "current_balance", nextOriginBalance)
407408
}
408409

409-
err = c.accounting.store.Put(originatedBalanceKey(c.peer), nextOriginBalance)
410+
err = c.accounting.store.Put(originatedBalanceKey(c.peer), &bigint.BigInt{Int: nextOriginBalance})
410411
if err != nil {
411412
return fmt.Errorf("failed to persist originated balance: %w", err)
412413
}
@@ -519,7 +520,8 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
519520

520521
// Balance returns the current balance for the given peer.
521522
func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) {
522-
err = a.store.Get(peerBalanceKey(peer), &balance)
523+
var w bigint.BigInt
524+
err = a.store.Get(peerBalanceKey(peer), &w)
523525

524526
if err != nil {
525527
if errors.Is(err, storage.ErrNotFound) {
@@ -528,12 +530,13 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) {
528530
return nil, err
529531
}
530532

531-
return balance, nil
533+
return w.Int, nil
532534
}
533535

534536
// OriginatedBalance returns the current balance for the given peer.
535537
func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) {
536-
err = a.store.Get(originatedBalanceKey(peer), &balance)
538+
var w bigint.BigInt
539+
err = a.store.Get(originatedBalanceKey(peer), &w)
537540

538541
if err != nil {
539542
if errors.Is(err, storage.ErrNotFound) {
@@ -542,12 +545,13 @@ func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, er
542545
return nil, err
543546
}
544547

545-
return balance, nil
548+
return w.Int, nil
546549
}
547550

548551
// SurplusBalance returns the current balance for the given peer.
549552
func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) {
550-
err = a.store.Get(peerSurplusBalanceKey(peer), &balance)
553+
var w bigint.BigInt
554+
err = a.store.Get(peerSurplusBalanceKey(peer), &w)
551555

552556
if err != nil {
553557
if errors.Is(err, storage.ErrNotFound) {
@@ -556,11 +560,11 @@ func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err e
556560
return nil, err
557561
}
558562

559-
if balance.Cmp(big.NewInt(0)) < 0 {
563+
if w.Cmp(big.NewInt(0)) < 0 {
560564
return nil, ErrInvalidValue
561565
}
562566

563-
return balance, nil
567+
return w.Int, nil
564568
}
565569

566570
// CompensatedBalance returns balance decreased by surplus balance
@@ -682,13 +686,13 @@ func (a *Accounting) Balances() (map[string]*big.Int, error) {
682686
}
683687

684688
if _, ok := s[addr.String()]; !ok {
685-
var storevalue *big.Int
686-
err = a.store.Get(peerBalanceKey(addr), &storevalue)
689+
var w bigint.BigInt
690+
err = a.store.Get(peerBalanceKey(addr), &w)
687691
if err != nil {
688692
return false, fmt.Errorf("get peer %s balance: %w", addr.String(), err)
689693
}
690694

691-
s[addr.String()] = storevalue
695+
s[addr.String()] = w.Int
692696
}
693697

694698
return false, nil
@@ -866,14 +870,15 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) {
866870
accountingPeer.lock.Lock()
867871
defer accountingPeer.lock.Unlock()
868872

869-
balance := new(big.Int)
873+
var w bigint.BigInt
870874
zero := big.NewInt(0)
871875

872-
err := a.store.Get(peerBalanceKey(peer), &balance)
873-
if err != nil {
874-
if !errors.Is(err, storage.ErrNotFound) {
875-
return nil, err
876-
}
876+
err := a.store.Get(peerBalanceKey(peer), &w)
877+
if err != nil && !errors.Is(err, storage.ErrNotFound) {
878+
return nil, err
879+
}
880+
balance := w.Int
881+
if balance == nil {
877882
balance = big.NewInt(0)
878883
}
879884

@@ -891,14 +896,15 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) {
891896

892897
accountingPeer := a.getAccountingPeer(peer)
893898

894-
balance := new(big.Int)
899+
var wl bigint.BigInt
895900
zero := big.NewInt(0)
896901

897-
err := a.store.Get(peerBalanceKey(peer), &balance)
898-
if err != nil {
899-
if !errors.Is(err, storage.ErrNotFound) {
900-
return nil, err
901-
}
902+
err := a.store.Get(peerBalanceKey(peer), &wl)
903+
if err != nil && !errors.Is(err, storage.ErrNotFound) {
904+
return nil, err
905+
}
906+
balance := wl.Int
907+
if balance == nil {
902908
balance = big.NewInt(0)
903909
}
904910

@@ -919,16 +925,20 @@ func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) {
919925
// shadowBalance returns the current debt reduced by any potentially debitable amount stored in shadowReservedBalance
920926
// this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve
921927
func (a *Accounting) shadowBalance(peer swarm.Address, accountingPeer *accountingPeer) (shadowBalance *big.Int, err error) {
922-
balance := new(big.Int)
928+
var ws bigint.BigInt
923929
zero := big.NewInt(0)
924930

925-
err = a.store.Get(peerBalanceKey(peer), &balance)
931+
err = a.store.Get(peerBalanceKey(peer), &ws)
926932
if err != nil {
927933
if errors.Is(err, storage.ErrNotFound) {
928934
return zero, nil
929935
}
930936
return nil, err
931937
}
938+
balance := ws.Int
939+
if balance == nil {
940+
balance = zero
941+
}
932942

933943
if balance.Cmp(zero) >= 0 {
934944
return zero, nil
@@ -986,7 +996,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
986996

987997
loggerV2.Debug("registering payment sent", "peer_address", peer, "amount", amount, "new_balance", nextBalance)
988998

989-
err = a.store.Put(peerBalanceKey(peer), nextBalance)
999+
err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
9901000
if err != nil {
9911001
a.logger.Error(err, "notify payment sent; failed to persist balance")
9921002
return
@@ -1043,7 +1053,7 @@ func (a *Accounting) NotifyPaymentReceived(peer swarm.Address, amount *big.Int)
10431053

10441054
loggerV2.Debug("surplus crediting peer", "peer_address", peer, "amount", amount, "new_balance", increasedSurplus)
10451055

1046-
err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
1056+
err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: increasedSurplus})
10471057
if err != nil {
10481058
return fmt.Errorf("failed to persist surplus balance: %w", err)
10491059
}
@@ -1064,7 +1074,7 @@ func (a *Accounting) NotifyPaymentReceived(peer swarm.Address, amount *big.Int)
10641074

10651075
loggerV2.Debug("crediting peer", "peer_address", peer, "amount", amount, "new_balance", nextBalance)
10661076

1067-
err = a.store.Put(peerBalanceKey(peer), nextBalance)
1077+
err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
10681078
if err != nil {
10691079
return fmt.Errorf("failed to persist balance: %w", err)
10701080
}
@@ -1083,7 +1093,7 @@ func (a *Accounting) NotifyPaymentReceived(peer swarm.Address, amount *big.Int)
10831093

10841094
loggerV2.Debug("surplus crediting peer due to refreshment", "peer_address", peer, "amount", surplusGrowth, "new_balance", increasedSurplus)
10851095

1086-
err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
1096+
err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: increasedSurplus})
10871097
if err != nil {
10881098
return fmt.Errorf("failed to persist surplus balance: %w", err)
10891099
}
@@ -1165,7 +1175,7 @@ func (a *Accounting) NotifyRefreshmentSent(peer swarm.Address, attemptedAmount,
11651175

11661176
newBalance := new(big.Int).Add(currentBalance, amount)
11671177

1168-
err = a.store.Put(peerBalanceKey(peer), newBalance)
1178+
err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: newBalance})
11691179
if err != nil {
11701180
a.logger.Error(err, "notifyrefreshmentsent failed to persist balance")
11711181
return
@@ -1206,7 +1216,7 @@ func (a *Accounting) NotifyRefreshmentReceived(peer swarm.Address, amount *big.I
12061216

12071217
// We allow a refreshment to potentially put us into debt as it was previously negotiated and be limited to the peer's outstanding debt plus shadow reserve
12081218
loggerV2.Debug("crediting peer", "peer_address", peer, "amount", amount, "new_balance", nextBalance)
1209-
err = a.store.Put(peerBalanceKey(peer), nextBalance)
1219+
err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
12101220
if err != nil {
12111221
return fmt.Errorf("failed to persist balance: %w", err)
12121222
}
@@ -1269,7 +1279,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric
12691279
if newSurplusBalance.Cmp(big.NewInt(0)) >= 0 {
12701280
loggerV2.Debug("surplus debiting peer", "peer_address", peer, "price", price, "new_balance", newSurplusBalance)
12711281

1272-
err = a.store.Put(peerSurplusBalanceKey(peer), newSurplusBalance)
1282+
err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: newSurplusBalance})
12731283
if err != nil {
12741284
return nil, fmt.Errorf("failed to persist surplus balance: %w", err)
12751285
}
@@ -1290,7 +1300,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric
12901300
// let's store 0 as surplus balance
12911301
loggerV2.Debug("surplus debiting peer", "peer_address", peer, "amount", debitIncrease, "new_balance", 0)
12921302

1293-
err = a.store.Put(peerSurplusBalanceKey(peer), big.NewInt(0))
1303+
err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: big.NewInt(0)})
12941304
if err != nil {
12951305
return nil, fmt.Errorf("failed to persist surplus balance: %w", err)
12961306
}
@@ -1308,7 +1318,7 @@ func (a *Accounting) increaseBalance(peer swarm.Address, _ *accountingPeer, pric
13081318

13091319
loggerV2.Debug("debiting peer", "peer_address", peer, "price", price, "new_balance", nextBalance)
13101320

1311-
err = a.store.Put(peerBalanceKey(peer), nextBalance)
1321+
err = a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: nextBalance})
13121322
if err != nil {
13131323
return nil, fmt.Errorf("failed to persist balance: %w", err)
13141324
}
@@ -1445,12 +1455,12 @@ func (a *Accounting) Connect(peer swarm.Address, fullNode bool) {
14451455
accountingPeer.thresholdGrowAt.Set(thresholdGrowStep)
14461456
accountingPeer.disconnectLimit.Set(disconnectLimit)
14471457

1448-
err := a.store.Put(peerBalanceKey(peer), zero)
1458+
err := a.store.Put(peerBalanceKey(peer), &bigint.BigInt{Int: zero})
14491459
if err != nil {
14501460
a.logger.Error(err, "failed to persist balance")
14511461
}
14521462

1453-
err = a.store.Put(peerSurplusBalanceKey(peer), zero)
1463+
err = a.store.Put(peerSurplusBalanceKey(peer), &bigint.BigInt{Int: zero})
14541464
if err != nil {
14551465
a.logger.Error(err, "failed to persist surplus balance")
14561466
}
@@ -1475,7 +1485,7 @@ func (a *Accounting) decreaseOriginatedBalanceTo(peer swarm.Address, limit *big.
14751485

14761486
// If originated balance is more into the negative domain, set it to limit
14771487
if originatedBalance.Cmp(toSet) < 0 {
1478-
err = a.store.Put(originatedBalanceKey(peer), toSet)
1488+
err = a.store.Put(originatedBalanceKey(peer), &bigint.BigInt{Int: toSet})
14791489
if err != nil {
14801490
return fmt.Errorf("failed to persist originated balance: %w", err)
14811491
}
@@ -1497,7 +1507,7 @@ func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big
14971507
// Move originated balance into the positive domain by amount
14981508
newOriginatedBalance := new(big.Int).Add(originatedBalance, amount)
14991509

1500-
err = a.store.Put(originatedBalanceKey(peer), newOriginatedBalance)
1510+
err = a.store.Put(originatedBalanceKey(peer), &bigint.BigInt{Int: newOriginatedBalance})
15011511
if err != nil {
15021512
return fmt.Errorf("failed to persist originated balance: %w", err)
15031513
}

pkg/bigint/bigint.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,21 @@ func (i *BigInt) UnmarshalJSON(b []byte) error {
4242
func Wrap(i *big.Int) *BigInt {
4343
return &BigInt{Int: i}
4444
}
45+
46+
// MarshalBinary implements encoding.BinaryMarshaler using Gob encoding.
47+
// Panics if the underlying *big.Int is nil, as this indicates a programmer error.
48+
func (i *BigInt) MarshalBinary() ([]byte, error) {
49+
if i.Int == nil {
50+
panic("bigint: MarshalBinary called on nil Int")
51+
}
52+
return i.GobEncode()
53+
}
54+
55+
// UnmarshalBinary implements encoding.BinaryUnmarshaler using Gob decoding.
56+
func (i *BigInt) UnmarshalBinary(data []byte) error {
57+
if len(data) == 0 {
58+
return fmt.Errorf("bigint: UnmarshalBinary called with empty data")
59+
}
60+
i.Int = new(big.Int)
61+
return i.GobDecode(data)
62+
}

pkg/bigint/bigint_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,94 @@ import (
1414
"github.com/ethersphere/bee/v2/pkg/bigint"
1515
)
1616

17+
func TestBinaryMarshalingRoundTrip(t *testing.T) {
18+
t.Parallel()
19+
20+
tests := []struct {
21+
name string
22+
val *big.Int
23+
}{
24+
{"positive", big.NewInt(123456789)},
25+
{"negative", big.NewInt(-987654321)},
26+
{"zero", big.NewInt(0)},
27+
{"large", new(big.Int).Mul(big.NewInt(math.MaxInt64), big.NewInt(math.MaxInt64))},
28+
}
29+
30+
for _, tc := range tests {
31+
t.Run(tc.name, func(t *testing.T) {
32+
t.Parallel()
33+
34+
original := bigint.Wrap(tc.val)
35+
data, err := original.MarshalBinary()
36+
if err != nil {
37+
t.Fatalf("MarshalBinary: %v", err)
38+
}
39+
40+
var got bigint.BigInt
41+
if err := got.UnmarshalBinary(data); err != nil {
42+
t.Fatalf("UnmarshalBinary: %v", err)
43+
}
44+
45+
if got.Cmp(tc.val) != 0 {
46+
t.Fatalf("got %v, want %v", got.Int, tc.val)
47+
}
48+
})
49+
}
50+
}
51+
52+
// TestBinaryMarshalingGobCompatibility verifies that MarshalBinary produces
53+
// byte-identical output to big.Int.GobEncode, confirming that nodes upgrading
54+
// from the old code (which stored raw *big.Int via GobEncode) will write
55+
// identical bytes after migration.
56+
func TestBinaryMarshalingGobCompatibility(t *testing.T) {
57+
t.Parallel()
58+
59+
val := big.NewInt(555000)
60+
gobData, err := val.GobEncode()
61+
if err != nil {
62+
t.Fatal(err)
63+
}
64+
65+
newData, err := bigint.Wrap(val).MarshalBinary()
66+
if err != nil {
67+
t.Fatal(err)
68+
}
69+
70+
if !reflect.DeepEqual(gobData, newData) {
71+
t.Fatalf("MarshalBinary output differs from GobEncode: got %v, want %v", newData, gobData)
72+
}
73+
74+
var got bigint.BigInt
75+
if err := got.UnmarshalBinary(gobData); err != nil {
76+
t.Fatalf("UnmarshalBinary of gob data: %v", err)
77+
}
78+
if got.Cmp(val) != 0 {
79+
t.Fatalf("got %v, want %v", got.Int, val)
80+
}
81+
}
82+
83+
func TestMarshalBinaryNilPanics(t *testing.T) {
84+
t.Parallel()
85+
86+
defer func() {
87+
if r := recover(); r == nil {
88+
t.Fatal("expected panic on MarshalBinary with nil Int, got none")
89+
}
90+
}()
91+
92+
var w bigint.BigInt // Int is nil
93+
_, _ = w.MarshalBinary()
94+
}
95+
96+
func TestUnmarshalBinaryEmptyErrors(t *testing.T) {
97+
t.Parallel()
98+
99+
var w bigint.BigInt
100+
if err := w.UnmarshalBinary([]byte{}); err == nil {
101+
t.Fatal("expected error on UnmarshalBinary with empty data, got nil")
102+
}
103+
}
104+
17105
func TestMarshaling(t *testing.T) {
18106
t.Parallel()
19107

0 commit comments

Comments
 (0)