@@ -13,6 +13,7 @@ import (
1313
1414 "github.com/evstack/ev-node/block/internal/cache"
1515 "github.com/evstack/ev-node/block/internal/common"
16+ coreda "github.com/evstack/ev-node/core/da"
1617 coreexecutor "github.com/evstack/ev-node/core/execution"
1718 coresequencer "github.com/evstack/ev-node/core/sequencer"
1819 "github.com/evstack/ev-node/pkg/config"
@@ -32,13 +33,20 @@ type broadcaster[T any] interface {
3233 WriteToStoreAndBroadcast (ctx context.Context , payload T ) error
3334}
3435
36+ // daHandler is the expected interface for the DA publishing
37+ type daHandler interface {
38+ SubmitHeaders (ctx context.Context , cache cache.Manager ) error
39+ SubmitData (ctx context.Context , cache cache.Manager , signer signer.Signer , genesis genesis.Genesis ) error
40+ }
41+
3542// Executor handles block production, transaction processing, and state management
3643type Executor struct {
3744 // Core components
3845 store store.Store
3946 exec coreexecutor.Executor
4047 sequencer coresequencer.Sequencer
4148 signer signer.Signer
49+ da coreda.DA
4250
4351 // Shared components
4452 cache cache.Manager
@@ -63,6 +71,9 @@ type Executor struct {
6371 // Reaper for transaction processing
6472 reaper * Reaper
6573
74+ // DA handler for submissions
75+ daHandler daHandler
76+
6677 // Logging
6778 logger zerolog.Logger
6879
@@ -72,12 +83,18 @@ type Executor struct {
7283 wg sync.WaitGroup
7384}
7485
75- // NewExecutor creates a new block executor
86+ // NewExecutor creates a new block executor.
87+ // The executor is responsible for:
88+ // - Block production from sequencer batches
89+ // - State transitions and validation
90+ // - P2P broadcasting of produced blocks
91+ // - DA submission of headers and data
7692func NewExecutor (
7793 store store.Store ,
7894 exec coreexecutor.Executor ,
7995 sequencer coresequencer.Sequencer ,
8096 signer signer.Signer ,
97+ da coreda.DA ,
8198 cache cache.Manager ,
8299 metrics * common.Metrics ,
83100 config config.Config ,
@@ -96,6 +113,7 @@ func NewExecutor(
96113 exec : exec ,
97114 sequencer : sequencer ,
98115 signer : signer ,
116+ da : da ,
99117 cache : cache ,
100118 metrics : metrics ,
101119 config : config ,
@@ -125,6 +143,9 @@ func (e *Executor) Start(ctx context.Context) error {
125143 }
126144 e .reaper = NewReaper (e .ctx , e .exec , e .sequencer , e .genesis .ChainID , DefaultInterval , e .logger , reaperStore , e )
127145
146+ // Initialize DA handler
147+ e .daHandler = common .NewDAHandler (e .da , e .cache , e .config , e .genesis , e .options , e .logger )
148+
128149 // Start execution loop
129150 e .wg .Add (1 )
130151 go func () {
@@ -139,6 +160,13 @@ func (e *Executor) Start(ctx context.Context) error {
139160 e .reaper .Start (e .ctx )
140161 }()
141162
163+ // Start DA submission loop
164+ e .wg .Add (1 )
165+ go func () {
166+ defer e .wg .Done ()
167+ e .daSubmissionLoop ()
168+ }()
169+
142170 e .logger .Info ().Msg ("executor started" )
143171 return nil
144172}
@@ -628,6 +656,36 @@ func (e *Executor) recordBlockMetrics(data *types.Data) {
628656 e .metrics .CommittedHeight .Set (float64 (data .Metadata .Height ))
629657}
630658
659+ // daSubmissionLoop handles submission of headers and data to DA layer for aggregator nodes.
660+ func (e * Executor ) daSubmissionLoop () {
661+ e .logger .Info ().Msg ("starting DA submission loop" )
662+ defer e .logger .Info ().Msg ("DA submission loop stopped" )
663+
664+ ticker := time .NewTicker (e .config .DA .BlockTime .Duration )
665+ defer ticker .Stop ()
666+
667+ for {
668+ select {
669+ case <- e .ctx .Done ():
670+ return
671+ case <- ticker .C :
672+ // Submit headers
673+ if e .cache .NumPendingHeaders () != 0 {
674+ if err := e .daHandler .SubmitHeaders (e .ctx , e .cache ); err != nil {
675+ e .logger .Error ().Err (err ).Msg ("failed to submit headers" )
676+ }
677+ }
678+
679+ // Submit data
680+ if e .cache .NumPendingData () != 0 {
681+ if err := e .daHandler .SubmitData (e .ctx , e .cache , e .signer , e .genesis ); err != nil {
682+ e .logger .Error ().Err (err ).Msg ("failed to submit data" )
683+ }
684+ }
685+ }
686+ }
687+ }
688+
631689// BatchData represents batch data from sequencer
632690type BatchData struct {
633691 * coresequencer.Batch
0 commit comments