Skip to content

Commit eac9cf0

Browse files
authored
Update CDP code samples using BufferedStorageBackend (#1772)
1 parent 41129b8 commit eac9cf0

2 files changed

Lines changed: 11 additions & 12 deletions

File tree

docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,13 @@ func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {
173173

174174
ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)
175175

176-
pubConfig := PublisherConfig{
176+
pubConfig := ingest.PublisherConfig{
177177
DataStoreConfig: adapter.dataStoreConfig,
178-
BufferedStorageConfig: DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),
178+
BufferedStorageConfig: ingest.DefaultBufferedStorageBackendConfig(1),
179179
}
180180

181181
fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
182-
return ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
182+
return ingest.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
183183
func(lcm xdr.LedgerCloseMeta) error {
184184
for _, processor := range adapter.processors {
185185
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
@@ -249,10 +249,6 @@ type = "GCS"
249249
[params]
250250
destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"
251251
252-
[schema]
253-
ledgers_per_file = 1
254-
files_per_partition = 64000
255-
256252
```
257253

258254
</CodeExample>

docs/data/indexers/build-your-own/ingest-sdk/developer_guide/ledgerbackends/bufferedstoragebackend.mdx

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,6 @@ func main() {
109109
Params: map[string]string{
110110
"destination_bucket_path": "your-gcs-bucket/data", // Replace with actual GCS bucket path
111111
},
112-
Schema: datastore.DataStoreSchema{
113-
LedgersPerFile: 1, // Defines how many ledgers per file
114-
FilesPerPartition: 64000, // Defines partition size
115-
},
116112
}
117113

118114
// Initialize the datastore
@@ -130,8 +126,15 @@ func main() {
130126
RetryWait: 5 * time.Second, // Wait time between retries
131127
}
132128

129+
// Per SEP-54 (https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0054.md),
130+
// the schema is stored in the datastore; LoadSchema retrieves it.
131+
schema, err := datastore.LoadSchema(context.Background(), dataStore, datastoreConfig)
132+
if err != nil {
133+
log.Fatal(errors.Wrap(err, "failed to retrieve datastore schema"))
134+
}
135+
133136
// Initialize the backend
134-
backend, err := ledgerbackend.NewBufferedStorageBackend(backendConfig, dataStore)
137+
backend, err := ledgerbackend.NewBufferedStorageBackend(backendConfig, dataStore, schema)
135138
if err != nil {
136139
log.Fatal(errors.Wrap(err, "failed to create buffered storage backend"))
137140
}

0 commit comments

Comments
 (0)