Skip to content

Commit ffb185b

Browse files
committed
remove submission path from blob service
1 parent cd77a57 commit ffb185b

6 files changed

Lines changed: 59 additions & 306 deletions

File tree

api/client/client.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ import (
99
logging "github.com/ipfs/go-log/v2"
1010
"google.golang.org/grpc"
1111

12-
appfibre "github.com/celestiaorg/celestia-app/v8/fibre"
13-
1412
"github.com/celestiaorg/celestia-node/blob"
15-
"github.com/celestiaorg/celestia-node/fibre"
1613
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
1714
stateapi "github.com/celestiaorg/celestia-node/nodebuilder/state"
1815
"github.com/celestiaorg/celestia-node/state"
@@ -142,24 +139,8 @@ func (c *Client) initTxClient(
142139
}
143140
c.State = core
144141

145-
// setup fibre client if the fibre key is available in the keyring
146-
var fibreSvc *fibre.Service
147-
fibreCfg := appfibre.DefaultClientConfig()
148-
fibreCfg.StateAddress = conn.Target()
149-
fibreClient, err := appfibre.NewClient(kr, fibreCfg)
150-
if err != nil {
151-
log.Warnw("fibre client not available, fibre blob submission disabled", "err", err)
152-
} else {
153-
if err := fibreClient.Start(ctx); err != nil {
154-
log.Warnw("failed to start fibre client, fibre blob submission disabled", "err", err)
155-
} else {
156-
acc := fibre.NewAccountClient(tc, conn)
157-
fibreSvc = fibre.NewService(fibreClient, tc, acc)
158-
}
159-
}
160-
161142
// setup blob submission service using core
162-
blobSvc := blob.NewService(core, fibreSvc, nil, nil, nil)
143+
blobSvc := blob.NewService(core, nil, nil, nil)
163144
err = blobSvc.Start(ctx)
164145
if err != nil {
165146
_ = core.Stop(ctx)
@@ -180,11 +161,6 @@ func (c *Client) initTxClient(
180161
if err != nil {
181162
return fmt.Errorf("failed to stop blob service: %w", err)
182163
}
183-
if fibreClient != nil {
184-
if stopErr := fibreClient.Stop(ctx); stopErr != nil {
185-
log.Warnw("failed to stop fibre client", "err", stopErr)
186-
}
187-
}
188164
err = core.Stop(ctx)
189165
if err != nil {
190166
return fmt.Errorf("failed to stop core accessor: %w", err)

blob/service.go

Lines changed: 15 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"go.opentelemetry.io/otel/codes"
1818
"go.opentelemetry.io/otel/trace"
1919

20-
appfibre "github.com/celestiaorg/celestia-app/v8/fibre"
2120
"github.com/celestiaorg/celestia-app/v8/pkg/appconsts"
2221
pkgproof "github.com/celestiaorg/celestia-app/v8/pkg/proof"
2322
"github.com/celestiaorg/go-square/v4/inclusion"
@@ -30,13 +29,11 @@ import (
3029
"github.com/celestiaorg/celestia-node/share"
3130
"github.com/celestiaorg/celestia-node/share/shwap"
3231
"github.com/celestiaorg/celestia-node/state"
33-
"github.com/celestiaorg/celestia-node/state/txclient"
3432
)
3533

3634
var (
37-
ErrBlobNotFound = errors.New("blob: not found")
38-
ErrInvalidProof = errors.New("blob: invalid proof")
39-
ErrMixedBlobTypes = errors.New("blob: cannot mix fibre (v2) and regular blobs in a single submit")
35+
ErrBlobNotFound = errors.New("blob: not found")
36+
ErrInvalidProof = errors.New("blob: invalid proof")
4037

4138
log = logging.Logger("blob")
4239
tracer = otel.Tracer("blob/service")
@@ -53,28 +50,12 @@ type Submitter interface {
5350
SubmitPayForBlob(context.Context, []*libshare.Blob, *state.TxConfig) (*types.TxResponse, error)
5451
}
5552

56-
// FibreSubmitter is an interface for submitting blobs to the Fibre network.
57-
type FibreSubmitter interface {
58-
Submit(
59-
ctx context.Context,
60-
ns libshare.Namespace,
61-
data []byte,
62-
config *txclient.TxConfig,
63-
) (
64-
_ *appfibre.PutResult,
65-
_ *appfibre.PaymentPromise,
66-
err error,
67-
)
68-
}
69-
7053
type Service struct {
7154
// ctx represents the Service's lifecycle context.
7255
ctx context.Context
7356
cancel context.CancelFunc
7457
// accessor dials the given celestia-core endpoint to submit blobs.
7558
blobSubmitter Submitter
76-
// fibreSubmitter submits blobs to the Fibre network.
77-
fibreSubmitter FibreSubmitter
7859
// shareGetter retrieves the EDS to fetch all shares from the requested header.
7960
shareGetter shwap.Getter
8061
// headerGetter fetches header by the provided height
@@ -87,18 +68,16 @@ type Service struct {
8768

8869
func NewService(
8970
submitter Submitter,
90-
fibreSubmitter FibreSubmitter,
9171
getter shwap.Getter,
9272
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error),
9373
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error),
9474
) *Service {
9575
return &Service{
96-
blobSubmitter: submitter,
97-
fibreSubmitter: fibreSubmitter,
98-
shareGetter: getter,
99-
headerGetter: headerGetter,
100-
headerSub: headerSub,
101-
metrics: nil, // Will be initialized via WithMetrics() if needed
76+
blobSubmitter: submitter,
77+
shareGetter: getter,
78+
headerGetter: headerGetter,
79+
headerSub: headerSub,
80+
metrics: nil, // Will be initialized via WithMetrics() if needed
10281
}
10382
}
10483

@@ -197,14 +176,10 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan
197176
return blobCh, nil
198177
}
199178

200-
// Submit sends PFB/PFF transaction and reports the height at which it was included.
179+
// Submit sends PFB transaction and reports the height at which it was included.
201180
// Allows sending multiple Blobs atomically synchronously.
202181
// Uses default wallet registered on the Node.
203182
// Handles gas estimation and fee calculation.
204-
//
205-
// If all blobs are fibre blobs (share version 2), they are submitted via the fibre network.
206-
// If all blobs are regular blobs, they are submitted via the standard PFB path.
207-
// Mixing fibre and regular blobs in a single submit is not allowed.
208183
func (s *Service) Submit(ctx context.Context, blobs []*Blob, txConfig *SubmitOptions) (_ uint64, err error) {
209184
ctx, span := tracer.Start(ctx, "blob/submit")
210185
defer func() {
@@ -216,46 +191,21 @@ func (s *Service) Submit(ctx context.Context, blobs []*Blob, txConfig *SubmitOpt
216191
}
217192
}()
218193

219-
if len(blobs) == 0 {
220-
return 0, errors.New("no blobs provided")
221-
}
222-
223-
allFibre := slices.IndexFunc(blobs, func(b *Blob) bool { return !b.IsFibreBlob() }) == -1
224-
anyFibre := slices.IndexFunc(blobs, func(b *Blob) bool { return b.IsFibreBlob() }) != -1
225-
226-
if anyFibre && !allFibre {
227-
return 0, ErrMixedBlobTypes
228-
}
229-
230-
if allFibre {
231-
return s.submitFibreBlobs(ctx, blobs, txConfig)
232-
}
233-
234194
libBlobs := make([]*libshare.Blob, len(blobs))
235195
for i := range blobs {
196+
if blobs[i].IsFibreBlob() {
197+
return 0, errors.New("cannot submit fibre blob. please use Fibre Submit instead")
198+
}
236199
libBlobs[i] = blobs[i].Blob
237200
}
238-
resp, err := s.blobSubmitter.SubmitPayForBlob(ctx, libBlobs, txConfig)
201+
202+
spanCtx := trace.ContextWithSpan(ctx, span)
203+
resp, err := s.blobSubmitter.SubmitPayForBlob(spanCtx, libBlobs, txConfig)
239204
if err != nil {
240205
return 0, err
241206
}
242-
return uint64(resp.Height), nil
243-
}
244-
245-
func (s *Service) submitFibreBlobs(ctx context.Context, blobs []*Blob, txConfig *SubmitOptions) (uint64, error) {
246-
if s.fibreSubmitter == nil {
247-
return 0, fmt.Errorf("fibre submitter is not available")
248-
}
249207

250-
var height uint64
251-
for _, blob := range blobs {
252-
res, _, err := s.fibreSubmitter.Submit(ctx, blob.Namespace(), blob.Data(), txConfig)
253-
if err != nil {
254-
return 0, err
255-
}
256-
height = res.Height
257-
}
258-
return height, nil
208+
return uint64(resp.Height), nil
259209
}
260210

261211
// Get retrieves a blob in a given namespace at the given height by commitment.

blob/service_test.go

Lines changed: 2 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@ import (
1414
"time"
1515

1616
tmrand "github.com/cometbft/cometbft/libs/rand"
17-
"github.com/cosmos/cosmos-sdk/types"
1817
"github.com/golang/mock/gomock"
1918
ds "github.com/ipfs/go-datastore"
2019
ds_sync "github.com/ipfs/go-datastore/sync"
2120
"github.com/stretchr/testify/assert"
2221
"github.com/stretchr/testify/require"
2322

24-
appfibre "github.com/celestiaorg/celestia-app/v8/fibre"
2523
"github.com/celestiaorg/celestia-app/v8/pkg/wrapper"
2624
"github.com/celestiaorg/go-header/store"
2725
libshare "github.com/celestiaorg/go-square/v4/share"
@@ -36,8 +34,6 @@ import (
3634
"github.com/celestiaorg/celestia-node/share/ipld"
3735
"github.com/celestiaorg/celestia-node/share/shwap"
3836
"github.com/celestiaorg/celestia-node/share/shwap/getters/mock"
39-
"github.com/celestiaorg/celestia-node/state"
40-
"github.com/celestiaorg/celestia-node/state/txclient"
4137
)
4238

4339
func TestBlobService_Get(t *testing.T) {
@@ -904,7 +900,7 @@ func createServiceWithSub(ctx context.Context, t testing.TB, blobs []*Blob) (*Se
904900
nd, err := eds.NamespaceData(ctx, accessor, ns)
905901
return nd, err
906902
})
907-
return NewService(nil, nil, shareGetter, fn, fn2), headers
903+
return NewService(nil, shareGetter, fn, fn2), headers
908904
}
909905

910906
func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *Service {
@@ -948,7 +944,7 @@ func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *
948944
fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) {
949945
return nil, fmt.Errorf("not implemented")
950946
}
951-
return NewService(nil, nil, shareGetter, fn, fn2)
947+
return NewService(nil, shareGetter, fn, fn2)
952948
}
953949

954950
// TestProveCommitmentAllCombinations tests proving all the commitments in a block.
@@ -1157,127 +1153,6 @@ func buildEDS(t *testing.T, shares []libshare.Share) (*rsmt2d.ExtendedDataSquare
11571153
return eds, roots.Hash()
11581154
}
11591155

1160-
// mockSubmitter implements the Submitter interface for testing.
1161-
type mockSubmitter struct {
1162-
height int64
1163-
err error
1164-
called bool
1165-
}
1166-
1167-
func (m *mockSubmitter) SubmitPayForBlob(
1168-
context.Context,
1169-
[]*libshare.Blob,
1170-
*state.TxConfig,
1171-
) (*types.TxResponse, error) {
1172-
m.called = true
1173-
if m.err != nil {
1174-
return nil, m.err
1175-
}
1176-
return &types.TxResponse{Height: m.height}, nil
1177-
}
1178-
1179-
// mockFibreSubmitter implements the FibreSubmitter interface for testing.
1180-
type mockFibreSubmitter struct {
1181-
height uint64
1182-
err error
1183-
calls int
1184-
}
1185-
1186-
func (m *mockFibreSubmitter) Submit(context.Context, libshare.Namespace, []byte, *txclient.TxConfig,
1187-
) (*appfibre.PutResult, *appfibre.PaymentPromise, error) {
1188-
m.calls++
1189-
if m.err != nil {
1190-
return nil, nil, m.err
1191-
}
1192-
return &appfibre.PutResult{Height: m.height}, nil, nil
1193-
}
1194-
1195-
func newTestFibreBlob(t *testing.T) *Blob {
1196-
t.Helper()
1197-
ns := libshare.RandomBlobNamespace()
1198-
commitment := bytes.Repeat([]byte{0xAA}, libshare.FibreCommitmentSize)
1199-
signer := bytes.Repeat([]byte{0xBB}, 20)
1200-
libBlob, err := libshare.NewV2Blob(ns, 0, commitment, signer)
1201-
require.NoError(t, err)
1202-
blob, err := convertBlobs(libBlob)
1203-
require.NoError(t, err)
1204-
return blob[0]
1205-
}
1206-
1207-
func TestService_Submit_AllFibreBlobs(t *testing.T) {
1208-
fibreSub := &mockFibreSubmitter{height: 42}
1209-
blobSub := &mockSubmitter{height: 100}
1210-
1211-
svc := &Service{
1212-
blobSubmitter: blobSub,
1213-
fibreSubmitter: fibreSub,
1214-
}
1215-
svc.ctx, svc.cancel = context.WithCancel(context.Background())
1216-
defer svc.cancel()
1217-
1218-
blobs := []*Blob{newTestFibreBlob(t), newTestFibreBlob(t)}
1219-
height, err := svc.Submit(context.Background(), blobs, nil)
1220-
require.NoError(t, err)
1221-
assert.Equal(t, uint64(42), height)
1222-
assert.Equal(t, 2, fibreSub.calls)
1223-
assert.False(t, blobSub.called)
1224-
}
1225-
1226-
func TestService_Submit_AllRegularBlobs(t *testing.T) {
1227-
fibreSub := &mockFibreSubmitter{height: 42}
1228-
blobSub := &mockSubmitter{height: 100}
1229-
1230-
svc := &Service{
1231-
blobSubmitter: blobSub,
1232-
fibreSubmitter: fibreSub,
1233-
}
1234-
svc.ctx, svc.cancel = context.WithCancel(context.Background())
1235-
defer svc.cancel()
1236-
1237-
libBlobs, err := libshare.GenerateV0Blobs([]int{10, 10}, false)
1238-
require.NoError(t, err)
1239-
blobs, err := convertBlobs(libBlobs...)
1240-
require.NoError(t, err)
1241-
1242-
height, err := svc.Submit(context.Background(), blobs, nil)
1243-
require.NoError(t, err)
1244-
assert.Equal(t, uint64(100), height)
1245-
assert.True(t, blobSub.called)
1246-
assert.Equal(t, 0, fibreSub.calls)
1247-
}
1248-
1249-
func TestService_Submit_MixedBlobsError(t *testing.T) {
1250-
svc := &Service{
1251-
blobSubmitter: &mockSubmitter{height: 100},
1252-
fibreSubmitter: &mockFibreSubmitter{height: 42},
1253-
}
1254-
svc.ctx, svc.cancel = context.WithCancel(context.Background())
1255-
defer svc.cancel()
1256-
1257-
libBlobs, err := libshare.GenerateV0Blobs([]int{10}, false)
1258-
require.NoError(t, err)
1259-
regularBlobs, err := convertBlobs(libBlobs...)
1260-
require.NoError(t, err)
1261-
1262-
mixed := []*Blob{regularBlobs[0], newTestFibreBlob(t)}
1263-
_, err = svc.Submit(context.Background(), mixed, nil)
1264-
require.ErrorIs(t, err, ErrMixedBlobTypes)
1265-
}
1266-
1267-
func TestService_Submit_FibreWithoutSubmitter(t *testing.T) {
1268-
svc := &Service{
1269-
blobSubmitter: &mockSubmitter{height: 100},
1270-
fibreSubmitter: nil,
1271-
}
1272-
svc.ctx, svc.cancel = context.WithCancel(context.Background())
1273-
defer svc.cancel()
1274-
1275-
blobs := []*Blob{newTestFibreBlob(t)}
1276-
_, err := svc.Submit(context.Background(), blobs, nil)
1277-
require.Error(t, err)
1278-
assert.Contains(t, err.Error(), "fibre submitter is not available")
1279-
}
1280-
12811156
func rawBlobSize(totalSize int) int {
12821157
return totalSize - delimLen(uint64(totalSize))
12831158
}

0 commit comments

Comments
 (0)