Skip to content

Commit 5c0928e

Browse files
committed
chore: self review
1 parent 839dc62 commit 5c0928e

9 files changed

Lines changed: 761 additions & 425 deletions

File tree

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package statehistory
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/NethermindEth/juno/core/felt"
8+
"github.com/NethermindEth/juno/core/state"
9+
"github.com/NethermindEth/juno/db"
10+
"github.com/NethermindEth/juno/db/dbutils"
11+
"github.com/NethermindEth/juno/migration/pipeline"
12+
"github.com/NethermindEth/juno/migration/semaphore"
13+
)
14+
15+
type classHashIngestor struct {
16+
baseIngestor
17+
}
18+
19+
var _ pipeline.State[*felt.Felt, task] = (*classHashIngestor)(nil)
20+
21+
func newClassHashIngestor(
22+
ctx context.Context,
23+
sem semaphore.ResourceSemaphore[db.Batch],
24+
database db.KeyValueReader,
25+
) *classHashIngestor {
26+
return &classHashIngestor{baseIngestor: newBaseIngestor(ctx, sem, database)}
27+
}
28+
29+
func (i *classHashIngestor) Run(index int, addr *felt.Felt, outputs chan<- task) error {
30+
t := &i.tasks[index]
31+
32+
deprecatedPrefix := db.DeprecatedContractClassHashHistoryKey(addr)
33+
contract, err := state.GetContract(i.database, addr)
34+
if err != nil {
35+
return fmt.Errorf("class-hash: GetContract(%s): %w", addr, err)
36+
}
37+
38+
deployKey := db.ContractClassHashHistoryAtBlockKey(addr, contract.DeployedHeight)
39+
deployEntryExists, err := i.database.Has(deployKey)
40+
if err != nil {
41+
return fmt.Errorf("class-hash: Has(deploy entry): %w", err)
42+
}
43+
44+
depIt, err := i.database.NewIterator(deprecatedPrefix, true)
45+
if err != nil {
46+
return fmt.Errorf("class-hash: open deprecated iter(%s): %w", addr, err)
47+
}
48+
defer depIt.Close()
49+
50+
if !depIt.First() {
51+
if deployEntryExists {
52+
return nil
53+
}
54+
err = state.WriteClassHashHistory(
55+
t.batch,
56+
addr,
57+
contract.DeployedHeight,
58+
&contract.ClassHash,
59+
)
60+
if err != nil {
61+
return err
62+
}
63+
t.completedAddrs++
64+
t.entryCount++
65+
return i.flush(t, outputs)
66+
}
67+
68+
rawValue, err := depIt.Value()
69+
if err != nil {
70+
return fmt.Errorf("class-hash: read first value(%s): %w", addr, err)
71+
}
72+
deployClassHash := felt.FromBytes[felt.Felt](rawValue)
73+
if err := state.WriteClassHashHistory(
74+
t.batch,
75+
addr,
76+
contract.DeployedHeight,
77+
&deployClassHash,
78+
); err != nil {
79+
return err
80+
}
81+
t.entryCount++
82+
if err := i.flush(t, outputs); err != nil {
83+
return err
84+
}
85+
86+
// Shift-up loop: each block in the deprecated history gets the *next*
87+
// entry's value (since in the old layout the value at block B was the
88+
// value before B's write). The final block gets the head class hash.
89+
for {
90+
block, err := parseBlockKey(depIt.Key(), deprecatedPrefix)
91+
if err != nil {
92+
return fmt.Errorf("class-hash(%s): %w", addr, err)
93+
}
94+
hasNext := depIt.Next()
95+
historyValue := contract.ClassHash
96+
if hasNext {
97+
rawValue, err := depIt.Value()
98+
if err != nil {
99+
return fmt.Errorf("class-hash(%s): %w", addr, err)
100+
}
101+
historyValue = felt.FromBytes[felt.Felt](rawValue)
102+
}
103+
if err := state.WriteClassHashHistory(t.batch, addr, block, &historyValue); err != nil {
104+
return err
105+
}
106+
t.entryCount++
107+
if err := i.flush(t, outputs); err != nil {
108+
return err
109+
}
110+
if !hasNext {
111+
break
112+
}
113+
}
114+
115+
if err := t.batch.DeleteRange(deprecatedPrefix, dbutils.UpperBound(deprecatedPrefix)); err != nil {
116+
return fmt.Errorf("class-hash: DeleteRange deprecated(%s): %w", addr, err)
117+
}
118+
t.completedAddrs++
119+
return nil
120+
}

migration/statehistory/committer.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ func newCommitter(
2929
}
3030

3131
func (c *committer) Run(_ int, t task, _ chan<- struct{}) error {
32+
defer c.batchSemaphore.Put()
33+
3234
c.logger.Debug(
3335
"writing batch",
34-
zap.Int("addrCount", t.addrCount),
36+
zap.Int("completedAddrs", t.completedAddrs),
3537
zap.Int("entryCount", t.entryCount),
3638
zap.Int("batchSize", t.batch.Size()),
3739
)
@@ -41,8 +43,7 @@ func (c *committer) Run(_ int, t task, _ chan<- struct{}) error {
4143
return err
4244
}
4345

44-
c.counter.log(byteSize, t.addrCount, t.entryCount)
45-
c.batchSemaphore.Put()
46+
c.counter.log(byteSize, t.completedAddrs, t.entryCount)
4647
return nil
4748
}
4849

migration/statehistory/counter.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import (
99
)
1010

1111
type counter struct {
12-
logger log.StructuredLogger
13-
timeLogRate time.Duration
14-
phaseName string
15-
start time.Time
16-
size uint64
17-
addrCount uint64
18-
entryCount uint64
12+
logger log.StructuredLogger
13+
timeLogRate time.Duration
14+
phaseName string
15+
start time.Time
16+
size uint64
17+
completedAddrs uint64
18+
entryCount uint64
1919
}
2020

2121
func newCounter(logger log.StructuredLogger, timeLogRate time.Duration, phaseName string) counter {
@@ -27,29 +27,29 @@ func newCounter(logger log.StructuredLogger, timeLogRate time.Duration, phaseNam
2727
}
2828
}
2929

30-
func (c *counter) log(byteSize uint64, addrCount, entryCount int) {
30+
func (c *counter) log(byteSize uint64, completedAddrs, entryCount int) {
3131
c.size += byteSize
32-
c.addrCount += uint64(addrCount)
32+
c.completedAddrs += uint64(completedAddrs)
3333
c.entryCount += uint64(entryCount)
3434

3535
now := time.Now()
3636
elapsed := now.Sub(c.start).Seconds()
37-
if elapsed > float64(c.timeLogRate.Seconds()) {
37+
if elapsed > c.timeLogRate.Seconds() {
3838
mbs := float64(c.size) / float64(db.Megabyte)
3939
c.logger.Info(
4040
"write speed",
4141
zap.String("phase", c.phaseName),
4242
zap.Float64("MB", mbs),
4343
zap.Float64("MB/s", mbs/elapsed),
44-
zap.Uint64("contracts", c.addrCount),
45-
zap.Float64("contracts/s", float64(c.addrCount)/elapsed),
44+
zap.Uint64("completedContracts", c.completedAddrs),
45+
zap.Float64("completedContracts/s", float64(c.completedAddrs)/elapsed),
4646
zap.Uint64("entries", c.entryCount),
4747
zap.Float64("entries/s", float64(c.entryCount)/elapsed),
4848
zap.Float64("time", elapsed),
4949
)
5050
c.start = now
5151
c.size = 0
52-
c.addrCount = 0
52+
c.completedAddrs = 0
5353
c.entryCount = 0
5454
}
5555
}

migration/statehistory/ingestor.go

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,63 @@
11
package statehistory
22

33
import (
4-
"github.com/NethermindEth/juno/core/felt"
4+
"context"
5+
56
"github.com/NethermindEth/juno/db"
6-
"github.com/NethermindEth/juno/migration/pipeline"
77
"github.com/NethermindEth/juno/migration/semaphore"
88
)
99

10-
type FlushBatchFn func(t *task)
11-
12-
type ingestor struct {
10+
type baseIngestor struct {
11+
ctx context.Context
1312
batchSemaphore semaphore.ResourceSemaphore[db.Batch]
1413
database db.KeyValueReader
1514
tasks []task
16-
transform func(db.KeyValueReader, *task, felt.Address, FlushBatchFn) error
1715
}
1816

19-
var _ pipeline.State[felt.Address, task] = (*ingestor)(nil)
20-
21-
func newIngestor(
17+
// newBaseIngestor pre-allocates one batch per ingestor slot. The semaphore is
18+
// created with capacity ingestorCount+1 immediately before this call, so the
19+
// acquires cannot block — using GetBlocking keeps the constructor signature
20+
// error-free.
21+
func newBaseIngestor(
22+
ctx context.Context,
2223
sem semaphore.ResourceSemaphore[db.Batch],
2324
database db.KeyValueReader,
24-
transform func(db.KeyValueReader, *task, felt.Address, FlushBatchFn) error,
25-
) *ingestor {
25+
) baseIngestor {
2626
tasks := make([]task, ingestorCount)
2727
for i := range tasks {
2828
tasks[i] = task{batch: sem.GetBlocking()}
2929
}
30-
return &ingestor{batchSemaphore: sem, database: database, tasks: tasks, transform: transform}
31-
}
32-
33-
func (p *ingestor) Run(index int, addr felt.Address, outputs chan<- task) error {
34-
t := &p.tasks[index]
35-
flush := func(t *task) {
36-
if t.batch.Size() < targetBatchByteSize {
37-
return
38-
}
39-
outputs <- *t
40-
*t = task{batch: p.batchSemaphore.GetBlocking()}
30+
return baseIngestor{
31+
ctx: ctx,
32+
batchSemaphore: sem,
33+
database: database,
34+
tasks: tasks,
4135
}
36+
}
4237

43-
if err := p.transform(p.database, t, addr, flush); err != nil {
44-
return err
38+
// flush emits the current task downstream when its batch hits target size and
39+
// acquires a fresh batch. The ctx-aware select on the channel send is the
40+
// snappy cancellation point. The semaphore acquire uses GetBlocking — it is
41+
// guaranteed to unblock within one committer iteration because the committer's
42+
// deferred Put always runs.
43+
func (b *baseIngestor) flush(t *task, outputs chan<- task) error {
44+
if t.batch.Size() < targetBatchByteSize {
45+
return nil
4546
}
46-
47-
if t.batch.Size() >= targetBatchByteSize {
48-
outputs <- *t
49-
*t = task{batch: p.batchSemaphore.GetBlocking()}
47+
select {
48+
case <-b.ctx.Done():
49+
return b.ctx.Err()
50+
case outputs <- *t:
5051
}
52+
*t = task{batch: b.batchSemaphore.GetBlocking()}
5153
return nil
5254
}
5355

54-
func (p *ingestor) Done(index int, outputs chan<- task) error {
55-
outputs <- p.tasks[index]
56+
func (b *baseIngestor) Done(index int, outputs chan<- task) error {
57+
select {
58+
case <-b.ctx.Done():
59+
return b.ctx.Err()
60+
case outputs <- b.tasks[index]:
61+
}
5662
return nil
5763
}

0 commit comments

Comments
 (0)