Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ type BootstrapPeersConfiguration struct {
// for historical data being streamed between peers (historical blocks).
// Defaults to: numCPU / 2.
StreamPersistShardConcurrency int `yaml:"streamPersistShardConcurrency"`
// StreamPersistShardFlushConcurrency controls how many shards in parallel to flush
// for historical data being streamed between peers (historical blocks).
// Defaults to: 1.
StreamPersistShardFlushConcurrency int `yaml:"streamPersistShardFlushConcurrency"`
}

func newDefaultBootstrapPeersConfiguration() BootstrapPeersConfiguration {
return BootstrapPeersConfiguration{
StreamShardConcurrency: peers.DefaultShardConcurrency,
StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency,
StreamShardConcurrency: peers.DefaultShardConcurrency,
StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency,
StreamPersistShardFlushConcurrency: peers.DefaultShardPersistenceFlushConcurrency,
}
}

Expand Down Expand Up @@ -256,7 +261,8 @@ func (bsc BootstrapConfiguration) New(
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetContextPool(opts.ContextPool()).
SetDefaultShardConcurrency(pCfg.StreamShardConcurrency).
SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency)
SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency).
SetShardPersistenceFlushConcurrency(pCfg.StreamPersistShardFlushConcurrency)
if err := validator.ValidatePeersBootstrapperOptions(pOpts); err != nil {
return nil, err
}
Expand Down
12 changes: 12 additions & 0 deletions src/dbnode/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ func IsInternalServerError(err error) bool {
return false
}

// InternalServerError returns an internal server error if there is one
// contained by the error.
func InternalServerError(err error) (*rpc.Error, bool) {
for err != nil {
if e, ok := err.(*rpc.Error); ok && tterrors.IsInternalError(e) {
return e, true
}
err = xerrors.InnerError(err)
}
return nil, false
}

// IsBadRequestError determines if the error is a bad request error.
func IsBadRequestError(err error) bool {
for err != nil {
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/client/replicated_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ func (s replicatedSession) FetchBootstrapBlocksFromPeers(
shard uint32,
start, end time.Time,
opts result.Options,
persisting bool,
) (result.ShardResult, error) {
return s.session.FetchBootstrapBlocksFromPeers(namespace, shard, start, end, opts)
return s.session.FetchBootstrapBlocksFromPeers(namespace, shard, start, end, opts, persisting)
}

// FetchBootstrapBlocksMetadataFromPeers will fetch the blocks metadata from
Expand Down
Loading