diff --git a/src/cmd/services/m3dbnode/config/bootstrap.go b/src/cmd/services/m3dbnode/config/bootstrap.go index db32771f67..4acf3db6f0 100644 --- a/src/cmd/services/m3dbnode/config/bootstrap.go +++ b/src/cmd/services/m3dbnode/config/bootstrap.go @@ -143,12 +143,17 @@ type BootstrapPeersConfiguration struct { // for historical data being streamed between peers (historical blocks). // Defaults to: numCPU / 2. StreamPersistShardConcurrency int `yaml:"streamPersistShardConcurrency"` + // StreamPersistShardFlushConcurrency controls how many shards in parallel to flush + // for historical data being streamed between peers (historical blocks). + // Defaults to: 1. + StreamPersistShardFlushConcurrency int `yaml:"streamPersistShardFlushConcurrency"` } func newDefaultBootstrapPeersConfiguration() BootstrapPeersConfiguration { return BootstrapPeersConfiguration{ - StreamShardConcurrency: peers.DefaultShardConcurrency, - StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency, + StreamShardConcurrency: peers.DefaultShardConcurrency, + StreamPersistShardConcurrency: peers.DefaultShardPersistenceConcurrency, + StreamPersistShardFlushConcurrency: peers.DefaultShardPersistenceFlushConcurrency, } } @@ -256,7 +261,8 @@ func (bsc BootstrapConfiguration) New( SetRuntimeOptionsManager(opts.RuntimeOptionsManager()). SetContextPool(opts.ContextPool()). SetDefaultShardConcurrency(pCfg.StreamShardConcurrency). - SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency) + SetShardPersistenceConcurrency(pCfg.StreamPersistShardConcurrency). + SetShardPersistenceFlushConcurrency(pCfg.StreamPersistShardFlushConcurrency) if err := validator.ValidatePeersBootstrapperOptions(pOpts); err != nil { return nil, err } diff --git a/src/dbnode/client/errors.go b/src/dbnode/client/errors.go index 896f1998b4..72a069a2dd 100644 --- a/src/dbnode/client/errors.go +++ b/src/dbnode/client/errors.go @@ -40,6 +40,18 @@ func IsInternalServerError(err error) bool { return false } +// InternalServerError returns an internal server error if there is one +// contained by the error. +func InternalServerError(err error) (*rpc.Error, bool) { + for err != nil { + if e, ok := err.(*rpc.Error); ok && tterrors.IsInternalError(e) { + return e, true + } + err = xerrors.InnerError(err) + } + return nil, false +} + // IsBadRequestError determines if the error is a bad request error. func IsBadRequestError(err error) bool { for err != nil { diff --git a/src/dbnode/client/replicated_session.go b/src/dbnode/client/replicated_session.go index 5b2f95191e..a36828c28a 100644 --- a/src/dbnode/client/replicated_session.go +++ b/src/dbnode/client/replicated_session.go @@ -296,8 +296,9 @@ func (s replicatedSession) FetchBootstrapBlocksFromPeers( shard uint32, start, end time.Time, opts result.Options, + persisting bool, ) (result.ShardResult, error) { - return s.session.FetchBootstrapBlocksFromPeers(namespace, shard, start, end, opts) + return s.session.FetchBootstrapBlocksFromPeers(namespace, shard, start, end, opts, persisting) } // FetchBootstrapBlocksMetadataFromPeers will fetch the blocks metadata from diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index c85fe493cf..ead8ca1b5f 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -24,6 +24,7 @@ import ( "bytes" "errors" "fmt" + "io" "math" "sort" "strings" @@ -35,6 +36,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/generated/proto/pagetoken" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/convert" @@ -61,6 +63,7 @@ import ( xtime "github.com/m3db/m3/src/x/time" apachethrift "github.com/apache/thrift/lib/go/thrift" + "github.com/gogo/protobuf/proto" "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" @@ -68,13 +71,11 @@ import ( ) const ( - clusterConnectWaitInterval = 10 * time.Millisecond - blocksMetadataChannelInitialCapacity = 4096 - gaugeReportInterval = 500 * time.Millisecond - blockMetadataChBufSize = 4096 - shardResultCapacity = 4096 - hostNotAvailableMinSleepInterval = 1 * time.Millisecond - hostNotAvailableMaxSleepInterval = 100 * time.Millisecond + clusterConnectWaitInterval = 10 * time.Millisecond + gaugeReportInterval = 500 * time.Millisecond + blockMetadataChBufSize = 65536 + hostNotAvailableMinSleepInterval = 1 * time.Millisecond + hostNotAvailableMaxSleepInterval = 100 * time.Millisecond ) type resultTypeEnum string @@ -159,6 +160,7 @@ type session struct { streamBlocksBatchSize int streamBlocksMetadataBatchTimeout time.Duration streamBlocksBatchTimeout time.Duration + streamMetadataSequentialWorkers xsync.PooledWorkerPool metrics sessionMetrics } @@ -357,6 +359,19 @@ func newSession(opts Options) (clientSession, error) { s.streamBlocksMetadataBatchTimeout = opts.FetchSeriesBlocksMetadataBatchTimeout() s.streamBlocksBatchTimeout = opts.FetchSeriesBlocksBatchTimeout() s.streamBlocksRetrier = opts.StreamBlocksRetrier() + pooledWorkerPoolOpts := xsync.NewPooledWorkerPoolOptions(). + SetInstrumentOptions(opts.InstrumentOptions(). + SetMetricsScope(scope.SubScope("stream-metadata-sequential-workers"))). + SetGrowOnDemand(false). + // Make sure to emit every time a worker in use since low frequency. + SetInstrumentSampleRate(sampler.Rate(1)) + s.streamMetadataSequentialWorkers, err = xsync.NewPooledWorkerPool( + opts.FetchSeriesBlocksBatchConcurrency(), + pooledWorkerPoolOpts) + if err != nil { + return nil, err + } + s.streamMetadataSequentialWorkers.Init() } if runtimeOptsMgr := opts.RuntimeOptionsManager(); runtimeOptsMgr != nil { @@ -2011,14 +2026,14 @@ func (s *session) fetchBlocksMetadataFromPeers( var ( metadataCh = make(chan receivedBlockMetadata, - blocksMetadataChannelInitialCapacity) + blockMetadataChBufSize) errCh = make(chan error, 1) meta = resultTypeMetadata m = s.newPeerMetadataStreamingProgressMetrics(shard, meta) ) go func() { errCh <- s.streamBlocksMetadataFromPeers(namespace, shard, - peers, start, end, level, metadataCh, resultOpts, m) + peers, start, end, level, metadataCh, resultOpts, false, m) close(metadataCh) close(errCh) }() @@ -2035,6 +2050,7 @@ func (s *session) FetchBootstrapBlocksFromPeers( shard uint32, start, end time.Time, opts result.Options, + persisting bool, ) (result.ShardResult, error) { nsCtx, err := s.nsCtxFromMetadata(nsMetadata) if err != nil { @@ -2078,7 +2094,7 @@ func (s *session) FetchBootstrapBlocksFromPeers( errCh := make(chan error, 1) go func() { errCh <- s.streamBlocksMetadataFromPeers(nsMetadata.ID(), shard, - peers, start, end, level, metadataCh, opts, progress) + peers, start, end, level, metadataCh, opts, persisting, progress) close(metadataCh) }() @@ -2194,6 +2210,7 @@ func (s *session) streamBlocksMetadataFromPeers( level runtimeReadConsistencyLevel, metadataCh chan<- receivedBlockMetadata, resultOpts result.Options, + persisting bool, progress *streamFromPeersMetrics, ) error { var ( @@ -2268,7 +2285,13 @@ func (s *session) streamBlocksMetadataFromPeers( for condition() { var err error currPageToken, err = s.streamBlocksMetadataFromPeer(namespace, shardID, - peer, start, end, currPageToken, metadataCh, resultOpts, progress) + peer, start, end, currPageToken, metadataCh, resultOpts, persisting, progress) + if err != nil { + s.log.Warn("failed stream blocks metadata from peer, will retry if need", + zap.String("peer", peer.Host().String()), + zap.Error(err)) + } + // Set error or success if err is nil errs.setError(idx, err) @@ -2324,7 +2347,263 @@ func (s *session) streamBlocksMetadataFromPeer( startPageToken pageToken, metadataCh chan<- receivedBlockMetadata, resultOpts result.Options, + persisting bool, progress *streamFromPeersMetrics, +) (pageToken, error) { + if !persisting { + // Fallback to original implementation since may have in memory data. + return s.streamBlocksMetadataFromPeerSequential(namespace, shard, peer, + start, end, startPageToken, metadataCh, resultOpts, progress, nil) + } + + var ( + optionIncludeSizes = true + optionIncludeChecksums = true + optionIncludeLastRead = true + ) + + // Get first page with size 1 in flush phase. + token := &pagetoken.PageToken{ + FlushedSeriesPhase: &pagetoken.PageToken_FlushedSeriesPhase{}, + } + requestToken, err := proto.Marshal(token) + if err != nil { + return nil, err + } + + // First start at zero and retrieve the volume index. + tctx, _ := thrift.NewContext(s.streamBlocksMetadataBatchTimeout) + req := rpc.NewFetchBlocksMetadataRawV2Request() + req.NameSpace = namespace.Bytes() + req.Shard = int32(shard) + req.RangeStart = start.UnixNano() + req.RangeEnd = end.UnixNano() + req.Limit = 1 + req.PageToken = requestToken + req.IncludeSizes = &optionIncludeSizes + req.IncludeChecksums = &optionIncludeChecksums + req.IncludeLastRead = &optionIncludeLastRead + + var ( + result *rpc.FetchBlocksMetadataRawV2Result_ + ) + borrowErr := peer.BorrowConnection(func(client rpc.TChanNode) { + result, err = client.FetchBlocksMetadataRawV2(tctx, req) + }) + if borrowErr != nil { + s.log.Debug("stream meta initial probe request borrow connection error", + zap.Error(err)) + return nil, borrowErr + } + if err != nil { + s.log.Debug("stream meta initial probe request error", + zap.Error(err)) + return nil, err + } + + if result.NextPageToken == nil { + // Complete, nothing for this time range. + return nil, nil + } + + // Decode page token. + token = &pagetoken.PageToken{} + if err := proto.Unmarshal(result.NextPageToken, token); err != nil { + return nil, err + } + + // Now get the volume. + if token.ActiveSeriesPhase != nil || token.FlushedSeriesPhase == nil { + // This is an error case. + s.log.Debug("stream meta flushed phase not set for block", + zap.Error(err)) + return nil, fmt.Errorf("flushed phase not set for block: %v", start) + } + if token.FlushedSeriesPhase.CurrBlockStartUnixNanos != start.UnixNano() { + // This is an error case. + s.log.Debug("stream meta flushed phase zero value for block", + zap.Error(err)) + return nil, fmt.Errorf("flushed phase zero value for block: %v", start) + } + + // We can directly take value of it. + volume := token.FlushedSeriesPhase.Volume + + var ( + sequentialDispatches int + finalDispatch bool + prevProbePos int64 + probePos int64 + wg sync.WaitGroup + errors xerrors.MultiError + errorsLock sync.Mutex + ) + enqueueStream := func() { + // Capture vars for safe access by lambda. + sequentialDispatchFinal := finalDispatch + sequentialDispatchIndex := sequentialDispatches + + sequentialDispatches++ + + startPos := prevProbePos + limitTotal := probePos - prevProbePos + + wg.Add(1) + s.streamMetadataSequentialWorkers.Go(func() { + defer wg.Done() + + // TODO: cancel inflight metadata calls if we return early + // from an error in the dispatch loop. + _, err := s.streamBlocksMetadataFromPeerSequential(namespace, shard, peer, + start, end, nil, metadataCh, resultOpts, progress, + &streamBlocksMetadataFromPeerSequentialOptions{ + startPos: startPos, + limitTotal: limitTotal, + volume: volume, + }) + if err != nil { + errorsLock.Lock() + errors = errors.Add(err) + errorsLock.Unlock() + s.log.Debug("stream meta sequential error", + zap.Int("sequentialDispatchIndex", sequentialDispatchIndex), + zap.Bool("sequentialDispatchFinal", sequentialDispatchFinal), + zap.Int64("startPos", startPos), + zap.Int64("limitTotal", limitTotal), + zap.Int64("volume", volume), + zap.Error(err)) + } + }) + } + + // Dispatch sequential requests in parallel. + for { + prevProbePos = probePos + probePos += int64(s.streamBlocksBatchSize) + + token = &pagetoken.PageToken{ + FlushedSeriesPhase: &pagetoken.PageToken_FlushedSeriesPhase{ + CurrBlockStartUnixNanos: start.UnixNano(), + CurrBlockEntryIdx: int64(probePos), + Volume: volume, + }, + } + requestToken, err := proto.Marshal(token) + if err != nil { + return nil, err + } + + tctx, _ := thrift.NewContext(s.streamBlocksMetadataBatchTimeout) + req := rpc.NewFetchBlocksMetadataRawV2Request() + req.NameSpace = namespace.Bytes() + req.Shard = int32(shard) + req.RangeStart = start.UnixNano() + req.RangeEnd = end.UnixNano() + req.Limit = 1 + req.PageToken = requestToken + req.IncludeSizes = &optionIncludeSizes + req.IncludeChecksums = &optionIncludeChecksums + req.IncludeLastRead = &optionIncludeLastRead + + var ( + result *rpc.FetchBlocksMetadataRawV2Result_ + ) + borrowErr := peer.BorrowConnection(func(client rpc.TChanNode) { + result, err = client.FetchBlocksMetadataRawV2(tctx, req) + }) + if borrowErr != nil { + s.log.Debug("stream meta probe request borrow connection error", + zap.Int("sequentialDispatches", sequentialDispatches), + zap.Int64("startPos", prevProbePos), + zap.Int64("limitTotal", probePos-prevProbePos), + zap.Error(err)) + return nil, borrowErr + } + if err != nil { + if serverErr, ok := InternalServerError(err); ok { + if serverErr.Message == io.EOF.Error() { + // No more, enqueue the last fetch and we're done. + finalDispatch = true + enqueueStream() + break + } + } + + s.log.Debug("stream meta probe request error", + zap.Int("sequentialDispatches", sequentialDispatches), + zap.Int64("startPos", prevProbePos), + zap.Int64("limitTotal", probePos-prevProbePos), + zap.Error(err)) + return nil, err + } + + if result.NextPageToken == nil { + // No more, enqueue the last fetch and we're done. + finalDispatch = true + enqueueStream() + break + } + + // Decode page token. + token = &pagetoken.PageToken{} + if err := proto.Unmarshal(result.NextPageToken, token); err != nil { + return nil, err + } + + // Verify we're progressing as necessary. + if token.ActiveSeriesPhase != nil || token.FlushedSeriesPhase == nil { + // This is an error case, should always get this phase back or + // completely empty token. + s.log.Debug("stream meta probe flushed phase not set for block", + zap.Error(err)) + return nil, fmt.Errorf("flush phase not set for block start in dispatch: %v", start) + } + if token.FlushedSeriesPhase.CurrBlockStartUnixNanos != start.UnixNano() { + // This is an error case. + s.log.Debug("stream meta probe flushed phase zero value for block", + zap.Error(err)) + return nil, fmt.Errorf("flushed phase zero value for block start in dispatch: %v", start) + } + + // Keep using latest volume + volume = token.FlushedSeriesPhase.Volume + + // Enqueue next and progress. + enqueueStream() + } + + wg.Wait() + + errorsLock.Lock() + defer errorsLock.Unlock() + + if errors.NumErrors() == 0 { + return nil, nil + } + + return nil, errors.FinalError() +} + +// todo rename from "options" to "position" +type streamBlocksMetadataFromPeerSequentialOptions struct { + startPos int64 + limitTotal int64 + volume int64 +} + +// streamBlocksMetadataFromPeer has several heap allocated anonymous +// function, however, they're only allocated once per peer/shard combination +// for the entire peer bootstrapping process so performance is acceptable +func (s *session) streamBlocksMetadataFromPeerSequential( + namespace ident.ID, + shard uint32, + peer peer, + start, end time.Time, + startPageToken pageToken, + metadataCh chan<- receivedBlockMetadata, + resultOpts result.Options, + progress *streamFromPeersMetrics, + position *streamBlocksMetadataFromPeerSequentialOptions, ) (pageToken, error) { var ( optionIncludeSizes = true @@ -2333,6 +2612,7 @@ func (s *session) streamBlocksMetadataFromPeer( moreResults = true idPool = s.pools.id bytesPool = resultOpts.DatabaseBlockOptions().BytesPool() + err error // Only used for logs peerStr = peer.Host().ID() @@ -2349,6 +2629,23 @@ func (s *session) streamBlocksMetadataFromPeer( } }() + // If resuming at a position, then encode the start page token ourselves. + if position != nil { + token := &pagetoken.PageToken{ + FlushedSeriesPhase: &pagetoken.PageToken_FlushedSeriesPhase{ + CurrBlockStartUnixNanos: start.UnixNano(), + CurrBlockEntryIdx: position.startPos, + Volume: position.volume, + }, + } + startPageToken, err = proto.Marshal(token) + if err != nil { + return nil, err + } + } + + var totalFetched int64 + // Declare before loop to avoid redeclaring each iteration attemptFn := func(client rpc.TChanNode) error { tctx, _ := thrift.NewContext(s.streamBlocksMetadataBatchTimeout) @@ -2357,12 +2654,19 @@ func (s *session) streamBlocksMetadataFromPeer( req.Shard = int32(shard) req.RangeStart = start.UnixNano() req.RangeEnd = end.UnixNano() - req.Limit = int64(s.streamBlocksBatchSize) req.PageToken = startPageToken req.IncludeSizes = &optionIncludeSizes req.IncludeChecksums = &optionIncludeChecksums req.IncludeLastRead = &optionIncludeLastRead + batchSize := int64(s.streamBlocksBatchSize) + limitCurr := batchSize + if position != nil && batchSize+totalFetched >= position.limitTotal { + limitCurr = position.limitTotal - totalFetched + } + + req.Limit = limitCurr + progress.metadataFetchBatchCall.Inc(1) result, err := client.FetchBlocksMetadataRawV2(tctx, req) if err != nil { @@ -2382,6 +2686,12 @@ func (s *session) streamBlocksMetadataFromPeer( moreResults = false } + totalFetched += int64(len(result.Elements)) + if position != nil && totalFetched >= position.limitTotal { + // Reached limit at this batch. + moreResults = false + } + for _, elem := range result.Elements { blockStart := time.Unix(0, elem.Start) @@ -3525,7 +3835,7 @@ func newBulkBlocksResult( return &bulkBlocksResult{ nsCtx: nsCtx, baseBlocksResult: newBaseBlocksResult(nsCtx, opts, resultOpts), - result: result.NewShardResult(shardResultCapacity, resultOpts), + result: result.NewShardResult(resultOpts), tagDecoderPool: tagDecoderPool, idPool: idPool, } @@ -3629,7 +3939,7 @@ type enqueueCh struct { metrics *streamFromPeersMetrics } -const enqueueChannelDefaultLen = 32768 +const enqueueChannelDefaultLen = 512 func newEnqueueChannel(m *streamFromPeersMetrics) enqueueChannel { c := &enqueueCh{ diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index f30a196671..a3e2cd234f 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -209,6 +209,7 @@ type AdminSession interface { shard uint32, start, end time.Time, opts result.Options, + persisting bool, ) (result.ShardResult, error) // FetchBootstrapBlocksMetadataFromPeers will fetch the blocks metadata from diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index b0fdba6c39..827249791c 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -228,7 +228,7 @@ func newDefaultBootstrappableTestSetups( SetTopologyInitializer(topologyInitializer).(client.AdminOptions). SetOrigin(origin) - // Prevent integration tests from timing out when a node is down + // Prevent integration tests from timing out when a node is down retryOpts = xretry.NewOptions(). SetInitialBackoff(1 * time.Millisecond). SetMaxRetries(1). diff --git a/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go b/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go index ee60e9d78d..b63fe0c2bc 100644 --- a/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go +++ b/src/dbnode/integration/peers_bootstrap_high_concurrency_test.go @@ -23,6 +23,7 @@ package integration import ( + "encoding/json" "fmt" "testing" "time" @@ -30,26 +31,63 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/m3ninx/idx" + "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestPeersBootstrapHighConcurrency(t *testing.T) { + for _, test := range []testPeersBootstrapHighConcurrencyOptions{ + {BatchSize: 4096, Concurrency: 64, NumSeries: 84848}, + } { + name, err := json.Marshal(test) + require.NoError(t, err) + t.Run(string(name), func(t *testing.T) { + testPeersBootstrapHighConcurrency(t, test) + }) + } +} + +type testPeersBootstrapHighConcurrencyOptions struct { + BatchSize int + Concurrency int + NumSeries int +} + +func testPeersBootstrapHighConcurrency( + t *testing.T, + testOpts testPeersBootstrapHighConcurrencyOptions, +) { if testing.Short() { t.SkipNow() } // Test setups log := xtest.NewLogger(t) - retentionOpts := retention.NewOptions(). + + blockSize := 2 * time.Hour + + idxOpts := namespace.NewIndexOptions(). + SetEnabled(true). + SetBlockSize(blockSize) + + rOpts := retention.NewOptions(). SetRetentionPeriod(6 * time.Hour). - SetBlockSize(2 * time.Hour). + SetBlockSize(blockSize). SetBufferPast(10 * time.Minute). SetBufferFuture(2 * time.Minute) - namesp, err := namespace.NewMetadata(testNamespaces[0], - namespace.NewOptions().SetRetentionOptions(retentionOpts)) + + nOpts := namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(idxOpts) + + namesp, err := namespace.NewMetadata(testNamespaces[0], nOpts) require.NoError(t, err) + opts := NewTestOptions(t). SetNamespaces([]namespace.Metadata{namesp}). // Use TChannel clients for writing / reading because we want to target individual nodes at a time @@ -73,22 +111,44 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) { defer closeFn() // Write test data for first node - total := 8 * batchSize * concurrency - log.Sugar().Debugf("testing a total of %d IDs with %d batch size %d concurrency", total, batchSize, concurrency) - shardIDs := make([]string, 0, total) - for i := 0; i < total; i++ { - id := fmt.Sprintf("id.%d", i) - shardIDs = append(shardIDs, id) - } + numSeries := testOpts.NumSeries + log.Sugar().Debugf("testing a total of %d IDs with %d batch size %d concurrency", + numSeries, testOpts.BatchSize, testOpts.Concurrency) now := setups[0].NowFn()() - blockSize := retentionOpts.BlockSize() - seriesMaps := generate.BlocksByStart([]generate.BlockConfig{ - {IDs: shardIDs, NumPoints: 3, Start: now.Add(-3 * blockSize)}, - {IDs: shardIDs, NumPoints: 3, Start: now.Add(-2 * blockSize)}, - {IDs: shardIDs, NumPoints: 3, Start: now.Add(-blockSize)}, - {IDs: shardIDs, NumPoints: 3, Start: now}, - }) + commonTags := []ident.Tag{ + { + Name: ident.StringID("fruit"), + Value: ident.StringID("apple"), + }, + } + numPoints := 10 + seriesMaps := generate.BlocksByStart(blockConfigs( + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now.Add(-3 * blockSize), + }), + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now.Add(-2 * blockSize), + }), + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now.Add(-1 * blockSize), + }), + generateTaggedBlockConfigs(generateTaggedBlockConfig{ + series: numSeries, + numPoints: numPoints, + commonTags: commonTags, + blockStart: now, + }), + )) err = writeTestDataToDisk(namesp, setups[0], seriesMaps, 0) require.NoError(t, err) @@ -96,8 +156,9 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) { require.NoError(t, setups[0].StartServer()) // Start the last server with peers and filesystem bootstrappers + bootstrapStart := time.Now() require.NoError(t, setups[1].StartServer()) - log.Debug("servers are now up") + log.Debug("servers are now up", zap.Duration("took", time.Since(bootstrapStart))) // Stop the servers defer func() { @@ -111,4 +172,62 @@ func TestPeersBootstrapHighConcurrency(t *testing.T) { for _, setup := range setups { verifySeriesMaps(t, setup, namesp.ID(), seriesMaps) } + + // Issue some index queries to the second node which bootstrapped the metadata + session, err := setups[1].M3DBClient().DefaultSession() + require.NoError(t, err) + + start := now.Add(-rOpts.RetentionPeriod()) + end := now.Add(blockSize) + queryOpts := index.QueryOptions{StartInclusive: start, EndExclusive: end} + + // Match on common tags + termQuery := idx.NewTermQuery(commonTags[0].Name.Bytes(), commonTags[0].Value.Bytes()) + iter, _, err := session.FetchTaggedIDs(namesp.ID(), + index.Query{Query: termQuery}, queryOpts) + require.NoError(t, err) + defer iter.Finalize() + + count := 0 + for iter.Next() { + count++ + } + require.Equal(t, numSeries, count) +} + +type generateTaggedBlockConfig struct { + series int + numPoints int + commonTags []ident.Tag + blockStart time.Time +} + +func generateTaggedBlockConfigs( + cfg generateTaggedBlockConfig, +) []generate.BlockConfig { + results := make([]generate.BlockConfig, 0, cfg.series) + for i := 0; i < cfg.series; i++ { + id := fmt.Sprintf("series_%d", i) + tags := make([]ident.Tag, 0, 1+len(cfg.commonTags)) + tags = append(tags, ident.Tag{ + Name: ident.StringID("series"), + Value: ident.StringID(fmt.Sprintf("%d", i)), + }) + tags = append(tags, cfg.commonTags...) + results = append(results, generate.BlockConfig{ + IDs: []string{id}, + Tags: ident.NewTags(tags...), + NumPoints: cfg.numPoints, + Start: cfg.blockStart, + }) + } + return results +} + +func blockConfigs(cfgs ...[]generate.BlockConfig) []generate.BlockConfig { + var results []generate.BlockConfig + for _, elem := range cfgs { + results = append(results, elem...) + } + return results } diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 9186052bc7..72835ec45a 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -58,6 +58,7 @@ import ( "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" xsync "github.com/m3db/m3/src/x/sync" "github.com/stretchr/testify/require" @@ -89,8 +90,36 @@ var ( return prototest.ProtoEqual(testSchema, expect, actual) } testProtoIter = prototest.NewProtoMessageIterator(testProtoMessages) + + metricsExposeRootScope tally.Scope ) +func init() { + if strings.ToLower(os.Getenv("METRICS_EXPOSE")) == "true" { + sanitization := instrument.PrometheusMetricSanitization + extended := instrument.DetailedExtendedMetrics + cfg := instrument.MetricsConfiguration{ + Sanitization: &sanitization, + SamplingRate: 1, + PrometheusReporter: &instrument.PrometheusConfiguration{ + HandlerPath: "/metrics", + ListenAddress: "0.0.0.0:7203", + }, + ExtendedMetrics: &extended, + } + scope, closer, _, err := cfg.NewRootScopeAndReporters( + instrument.NewRootScopeAndReportersOptions{}) + if err != nil { + panic(err) + } + + defer closer.Close() + + // Assign global var that it should be used. + metricsExposeRootScope = scope + } +} + // nowSetterFn is the function that sets the current time type nowSetterFn func(t time.Time) @@ -255,6 +284,12 @@ func NewTestSetup( storageOpts = storageOpts.SetInstrumentOptions( storageOpts.InstrumentOptions().SetMetricsScope(scope)) + if metricsExposeRootScope != nil { + // Override if a global root scope should be used. + storageOpts = storageOpts.SetInstrumentOptions( + storageOpts.InstrumentOptions().SetMetricsScope(metricsExposeRootScope)) + } + // Use specified series cache policy from environment if set. seriesCachePolicy := strings.ToLower(os.Getenv("TEST_SERIES_CACHE_POLICY")) if seriesCachePolicy != "" { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go index 1d592dd03e..c3e3a595ef 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go @@ -48,6 +48,11 @@ var ( // src/cmd/services/m3dbnode/config package if this is changed. DefaultShardPersistenceConcurrency = int(math.Max(1, float64(runtime.NumCPU())/2)) defaultPersistenceMaxQueueSize = 0 + // DefaultShardPersistenceFlushConcurrency controls how many shards in parallel to flush + // for historical data being streamed between peers (historical blocks). + // Update BootstrapPeersConfiguration comment in + // src/cmd/services/m3dbnode/config package if this is changed. + DefaultShardPersistenceFlushConcurrency = 1 ) var ( @@ -60,26 +65,28 @@ var ( ) type options struct { - resultOpts result.Options - client client.AdminClient - defaultShardConcurrency int - shardPersistenceConcurrency int - persistenceMaxQueueSize int - persistManager persist.Manager - runtimeOptionsManager m3dbruntime.OptionsManager - contextPool context.Pool - fsOpts fs.Options - indexOpts index.Options - compactor *compaction.Compactor + resultOpts result.Options + client client.AdminClient + defaultShardConcurrency int + shardPersistenceConcurrency int + shardPersistenceFlushConcurrency int + persistenceMaxQueueSize int + persistManager persist.Manager + runtimeOptionsManager m3dbruntime.OptionsManager + contextPool context.Pool + fsOpts fs.Options + indexOpts index.Options + compactor *compaction.Compactor } // NewOptions creates new bootstrap options. func NewOptions() Options { return &options{ - resultOpts: result.NewOptions(), - defaultShardConcurrency: DefaultShardConcurrency, - shardPersistenceConcurrency: DefaultShardPersistenceConcurrency, - persistenceMaxQueueSize: defaultPersistenceMaxQueueSize, + resultOpts: result.NewOptions(), + defaultShardConcurrency: DefaultShardConcurrency, + shardPersistenceConcurrency: DefaultShardPersistenceConcurrency, + shardPersistenceFlushConcurrency: DefaultShardPersistenceFlushConcurrency, + persistenceMaxQueueSize: defaultPersistenceMaxQueueSize, // Use a zero pool, this should be overriden at config time. contextPool: context.NewPool(context.NewOptions(). SetContextPoolOptions(pool.NewObjectPoolOptions().SetSize(0)). @@ -149,6 +156,16 @@ func (o *options) ShardPersistenceConcurrency() int { return o.shardPersistenceConcurrency } +func (o *options) SetShardPersistenceFlushConcurrency(value int) Options { + opts := *o + opts.shardPersistenceFlushConcurrency = value + return &opts +} + +func (o *options) ShardPersistenceFlushConcurrency() int { + return o.shardPersistenceFlushConcurrency +} + func (o *options) SetPersistenceMaxQueueSize(value int) Options { opts := *o opts.persistenceMaxQueueSize = value diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index d76efed0df..39b584a555 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -22,6 +22,7 @@ package peers import ( "fmt" + "io" "sync" "time" @@ -42,6 +43,7 @@ import ( "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -231,8 +233,6 @@ func (s *peersSource) readData( } var ( - namespace = nsMetadata.ID() - persistFlush persist.FlushPreparer shouldPersist = false // TODO(bodu): We should migrate to series.CacheLRU only. seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() @@ -242,24 +242,7 @@ func (s *peersSource) readData( if persistConfig.Enabled && (seriesCachePolicy == series.CacheRecentlyRead || seriesCachePolicy == series.CacheLRU) && persistConfig.FileSetType == persist.FileSetFlushType { - persistManager := s.opts.PersistManager() - - // Neither of these should ever happen - if seriesCachePolicy != series.CacheAll && persistManager == nil { - s.log.Fatal("tried to perform a bootstrap with persistence without persist manager") - } - - s.log.Info("peers bootstrapper resolving block retriever", zap.Stringer("namespace", namespace)) - - persist, err := persistManager.StartFlushPersist() - if err != nil { - return nil, err - } - - defer persist.DoneFlush() - shouldPersist = true - persistFlush = persist } result := result.NewDataBootstrapResult() @@ -273,13 +256,14 @@ func (s *peersSource) readData( var ( resultLock sync.Mutex wg sync.WaitGroup - persistenceWorkerDoneCh = make(chan struct{}) persistenceMaxQueueSize = s.opts.PersistenceMaxQueueSize() persistenceQueue = make(chan persistenceFlush, persistenceMaxQueueSize) resultOpts = s.opts.ResultOptions() count = shardTimeRanges.Len() concurrency = s.opts.DefaultShardConcurrency() blockSize = nsMetadata.Options().RetentionOptions().BlockSize() + persistWg = &sync.WaitGroup{} + persistClosers []io.Closer ) if shouldPersist { concurrency = s.opts.ShardPersistenceConcurrency() @@ -290,8 +274,16 @@ func (s *peersSource) readData( zap.Int("concurrency", concurrency), zap.Bool("shouldPersist", shouldPersist)) if shouldPersist { - go s.startPersistenceQueueWorkerLoop( - opts, persistenceWorkerDoneCh, persistenceQueue, persistFlush, result, &resultLock) + // Spin up persist workers. + for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ { + closer, err := s.startPersistenceQueueWorkerLoop(opts, + persistWg, persistenceQueue, result, &resultLock) + if err != nil { + return nil, err + } + + persistClosers = append(persistClosers, closer) + } } workers := xsync.NewWorkerPool(concurrency) @@ -310,31 +302,70 @@ func (s *peersSource) readData( wg.Wait() close(persistenceQueue) if shouldPersist { - // Wait for the persistenceQueueWorker to finish flushing everything - <-persistenceWorkerDoneCh + // Wait for the persistenceQueue workers to finish flushing everything. + persistWg.Wait() + + // Close any persist closers to finalize files written. + for _, closer := range persistClosers { + if err := closer.Close(); err != nil { + return nil, err + } + } } return result, nil } -// startPersistenceQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that +func (s *peersSource) startPersistenceQueueWorkerLoop( + opts bootstrap.RunOptions, + persistWg *sync.WaitGroup, + persistenceQueue chan persistenceFlush, + bootstrapResult result.DataBootstrapResult, + lock *sync.Mutex, +) (io.Closer, error) { + persistMgr, err := fs.NewPersistManager(s.opts.FilesystemOptions()) + if err != nil { + return nil, err + } + + persistFlush, err := persistMgr.StartFlushPersist() + if err != nil { + return nil, err + } + + persistWg.Add(1) + go func() { + defer persistWg.Done() + s.runPersistenceQueueWorkerLoop(opts, persistenceQueue, + persistFlush, bootstrapResult, lock) + }() + + return xclose.CloserFn(persistFlush.DoneFlush), nil +} + +// runPersistenceQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that // loops through the persistenceQueue and performs a flush for each entry, ensuring that // no more than one flush is ever happening at once. Once the persistenceQueue channel // is closed, and the worker has completed flushing all the remaining entries, it will close the // provided doneCh so that callers can block until everything has been successfully flushed. -func (s *peersSource) startPersistenceQueueWorkerLoop( +func (s *peersSource) runPersistenceQueueWorkerLoop( opts bootstrap.RunOptions, - doneCh chan struct{}, persistenceQueue chan persistenceFlush, persistFlush persist.FlushPreparer, bootstrapResult result.DataBootstrapResult, lock *sync.Mutex, ) { + // Track async cleanup tasks. + asyncTasks := &sync.WaitGroup{} + + // Wait for cleanups to all occur before returning from worker. + defer asyncTasks.Wait() + // If performing a bootstrap with persistence enabled then flush one // at a time as shard results are gathered. for flush := range persistenceQueue { err := s.flush(opts, persistFlush, flush.nsMetadata, flush.shard, - flush.shardResult, flush.timeRange) + flush.shardResult, flush.timeRange, asyncTasks) if err == nil { continue } @@ -353,7 +384,6 @@ func (s *peersSource) startPersistenceQueueWorkerLoop( bootstrapResult.SetUnfulfilled(unfulfilled) lock.Unlock() } - close(doneCh) } // fetchBootstrapBlocksFromPeers loops through all the provided ranges for a given shard and @@ -389,7 +419,7 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( for blockStart := currRange.Start; blockStart.Before(currRange.End); blockStart = blockStart.Add(blockSize) { blockEnd := blockStart.Add(blockSize) shardResult, err := session.FetchBootstrapBlocksFromPeers( - nsMetadata, shard, blockStart, blockEnd, bopts) + nsMetadata, shard, blockStart, blockEnd, bopts, shouldPersist) s.logFetchBootstrapBlocksFromPeersOutcome(shard, shardResult, err) if err != nil { @@ -487,6 +517,7 @@ func (s *peersSource) flush( shard uint32, shardResult result.ShardResult, tr xtime.Range, + asyncTasks *sync.WaitGroup, ) error { persistConfig := opts.PersistConfig() if persistConfig.FileSetType != persist.FileSetFlushType { @@ -520,9 +551,7 @@ func (s *peersSource) flush( var ( ropts = nsMetadata.Options().RetentionOptions() blockSize = ropts.BlockSize() - flushCtx = s.opts.ContextPool().Get() ) - for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) { prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: nsMetadata, @@ -567,43 +596,25 @@ func (s *peersSource) flush( continue } - flushCtx.Reset() - stream, err := bl.Stream(flushCtx) + checksum, err := bl.Checksum() if err != nil { - flushCtx.BlockingCloseReset() blockErr = err // Need to call prepared.Close, avoid return break } - segment, err := stream.Segment() - if err != nil { - flushCtx.BlockingCloseReset() - blockErr = err // Need to call prepared.Close, avoid return - break - } + // Discard and finalize the block. + segment := bl.Discard() - checksum, err := bl.Checksum() - if err != nil { - flushCtx.BlockingCloseReset() - blockErr = err - break - } + // Remove from map. + s.Blocks.RemoveBlockAt(start) metadata := persist.NewMetadataFromIDAndTags(s.ID, s.Tags, persist.MetadataOptions{}) err = prepared.Persist(metadata, segment, checksum) - flushCtx.BlockingCloseReset() if err != nil { blockErr = err // Need to call prepared.Close, avoid return break } - - // Now that we've persisted the data to disk, we can finalize the block, - // as there is no need to keep it in memory. We do this here because it - // is better to do this as we loop to make blocks return to the pool earlier - // than all at once the end of this flush cycle. - s.Blocks.RemoveBlockAt(start) - bl.Close() } // Always close before attempting to check if block error occurred, @@ -619,38 +630,45 @@ func (s *peersSource) flush( } } - // Since we've persisted the data to disk, we don't want to keep all the series in the shard - // result. Otherwise if we leave them in, then they will all get loaded into the shard object, - // and then immediately evicted on the next tick which causes unnecessary memory pressure - // during peer bootstrapping. - numSeriesTriedToRemoveWithRemainingBlocks := 0 - for _, entry := range shardResult.AllSeries().Iter() { - series := entry.Value() - numBlocksRemaining := len(series.Blocks.AllBlocks()) - // Should never happen since we removed all the block in the previous loop and fetching - // bootstrap blocks should always be exclusive on the end side. - if numBlocksRemaining > 0 { - numSeriesTriedToRemoveWithRemainingBlocks++ - continue - } + // Perform cleanup async but allow caller to wait on them. + // This allows to progress to next flush faster. + asyncTasks.Add(1) + go func() { + defer asyncTasks.Done() + + // Since we've persisted the data to disk, we don't want to keep all the series in the shard + // result. Otherwise if we leave them in, then they will all get loaded into the shard object, + // and then immediately evicted on the next tick which causes unnecessary memory pressure + // during peer bootstrapping. + numSeriesTriedToRemoveWithRemainingBlocks := 0 + for _, entry := range shardResult.AllSeries().Iter() { + series := entry.Value() + numBlocksRemaining := len(series.Blocks.AllBlocks()) + // Should never happen since we removed all the block in the previous loop and fetching + // bootstrap blocks should always be exclusive on the end side. + if numBlocksRemaining > 0 { + numSeriesTriedToRemoveWithRemainingBlocks++ + continue + } - shardResult.RemoveSeries(series.ID) - series.Blocks.Close() - // Safe to finalize these IDs and Tags because the prepared object was the only other thing - // using them, and it has been closed. - series.ID.Finalize() - series.Tags.Finalize() - } - if numSeriesTriedToRemoveWithRemainingBlocks > 0 { - iOpts := s.opts.ResultOptions().InstrumentOptions() - instrument.EmitAndLogInvariantViolation(iOpts, func(l *zap.Logger) { - l.With( - zap.Int64("start", tr.Start.Unix()), - zap.Int64("end", tr.End.Unix()), - zap.Int("numTimes", numSeriesTriedToRemoveWithRemainingBlocks), - ).Error("error tried to remove series that still has blocks") - }) - } + shardResult.RemoveSeries(series.ID) + series.Blocks.Close() + // Safe to finalize these IDs and Tags because the prepared object was the only other thing + // using them, and it has been closed. + series.ID.Finalize() + series.Tags.Finalize() + } + if numSeriesTriedToRemoveWithRemainingBlocks > 0 { + iOpts := s.opts.ResultOptions().InstrumentOptions() + instrument.EmitAndLogInvariantViolation(iOpts, func(l *zap.Logger) { + l.With( + zap.Int64("start", tr.Start.Unix()), + zap.Int64("end", tr.End.Unix()), + zap.Int("numTimes", numSeriesTriedToRemoveWithRemainingBlocks), + ).Error("error tried to remove series that still has blocks") + }) + } + }() return nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go index 3e7193f607..33ebd5dcfe 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go @@ -68,6 +68,16 @@ type Options interface { // persistence enabled. ShardPersistenceConcurrency() int + // SetShardPersistenceFlushConcurrency sets the flush concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + SetShardPersistenceFlushConcurrency(value int) Options + + // ShardPersistenceFlushConcurrency returns the flush concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + ShardPersistenceFlushConcurrency() int + // SetPersistenceMaxQueueSize sets the max queue for // bootstrapping shards waiting in line to persist without blocking // the concurrent shard fetchers. diff --git a/src/dbnode/storage/bootstrap/result/result_data.go b/src/dbnode/storage/bootstrap/result/result_data.go index 6189905bf8..51fbdcfbad 100644 --- a/src/dbnode/storage/bootstrap/result/result_data.go +++ b/src/dbnode/storage/bootstrap/result/result_data.go @@ -66,11 +66,10 @@ type shardResult struct { } // NewShardResult creates a new shard result. -func NewShardResult(capacity int, opts Options) ShardResult { +func NewShardResult(opts Options) ShardResult { return &shardResult{ opts: opts, blocks: NewMap(MapOptions{ - InitialSize: capacity, KeyCopyPool: opts.DatabaseBlockOptions().BytesPool().BytesPool(), }), } diff --git a/src/dbnode/storage/index/convert/convert.go b/src/dbnode/storage/index/convert/convert.go index 7da0228e8e..effb5aad1c 100644 --- a/src/dbnode/storage/index/convert/convert.go +++ b/src/dbnode/storage/index/convert/convert.go @@ -27,6 +27,7 @@ import ( "unicode/utf8" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" ) @@ -211,7 +212,13 @@ func TagsFromTagsIter( idRef = true } else { if idPool != nil { - tag.Name = idPool.Clone(curr.Name) + // NB(r): Fast path for if a graphite tag name to save + // a lot of space is to reuse a preallocated tag name. + if idx, ok := graphite.TagIndex(nameBytes); ok { + tag.Name = graphite.TagNameID(idx) + } else { + tag.Name = idPool.Clone(curr.Name) + } } else { copiedBytes := append([]byte(nil), curr.Name.Bytes()...) tag.Name = ident.BytesID(copiedBytes) diff --git a/src/dbnode/storage/repair.go b/src/dbnode/storage/repair.go index 202dbbe2f6..8f6eb523b2 100644 --- a/src/dbnode/storage/repair.go +++ b/src/dbnode/storage/repair.go @@ -273,8 +273,7 @@ func (r shardRepairer) Repair( // TODO(rartoul): Copying the IDs for the purposes of the map key is wasteful. Considering using // SetUnsafe or marking as NoFinalize() and making the map check IsNoFinalize(). - numMismatchSeries := seriesWithChecksumMismatches.Len() - results := result.NewShardResult(numMismatchSeries, rsOpts) + results := result.NewShardResult(rsOpts) for i, metadatasToFetchBlocksFor := range metadatasToFetchBlocksForPerSession { if len(metadatasToFetchBlocksFor) == 0 { continue diff --git a/src/query/graphite/graphite/tags.go b/src/query/graphite/graphite/tags.go index 29be63f038..cd857a4f28 100644 --- a/src/query/graphite/graphite/tags.go +++ b/src/query/graphite/graphite/tags.go @@ -25,6 +25,7 @@ import ( "fmt" "strconv" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/unsafe" ) @@ -49,6 +50,8 @@ const ( var ( // Should never be modified after init(). preFormattedTagNames [][]byte + // Should never be modified after init(). + preFormattedTagNameIDs []ident.ID // Prefix is the prefix for graphite metrics Prefix = []byte("__g") @@ -61,6 +64,7 @@ func init() { for i := 0; i < numPreFormattedTagNames; i++ { name := generateTagName(i) preFormattedTagNames = append(preFormattedTagNames, name) + preFormattedTagNameIDs = append(preFormattedTagNameIDs, ident.BytesID(name)) } } @@ -74,6 +78,16 @@ func TagName(idx int) []byte { return []byte(fmt.Sprintf(graphiteFormat, idx)) } +// TagNameID gets a preallocated or generate a tag name ID for the given graphite +// path index. +func TagNameID(idx int) ident.ID { + if idx < len(preFormattedTagNameIDs) { + return preFormattedTagNameIDs[idx] + } + + return ident.StringID(fmt.Sprintf(graphiteFormat, idx)) +} + func generateTagName(idx int) []byte { return []byte(fmt.Sprintf(graphiteFormat, idx)) } diff --git a/src/x/sync/options.go b/src/x/sync/options.go index a417617a39..29cfd7a864 100644 --- a/src/x/sync/options.go +++ b/src/x/sync/options.go @@ -25,11 +25,13 @@ import ( "time" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/sampler" ) const ( defaultKillWorkerProbability = 0.001 defaultGrowOnDemand = false + defaultInstrumentSampleRate = sampler.Rate(0.001) ) var ( @@ -46,8 +48,9 @@ func NewPooledWorkerPoolOptions() PooledWorkerPoolOptions { growOnDemand: defaultGrowOnDemand, numShards: defaultNumShards, killWorkerProbability: defaultKillWorkerProbability, - nowFn: defaultNowFn, - iOpts: instrument.NewOptions(), + nowFn: defaultNowFn, + iOpts: instrument.NewOptions(), + instrumentSampleRate: defaultInstrumentSampleRate, } } @@ -57,6 +60,7 @@ type pooledWorkerPoolOptions struct { killWorkerProbability float64 nowFn NowFn iOpts instrument.Options + instrumentSampleRate sampler.Rate } func (o *pooledWorkerPoolOptions) SetGrowOnDemand(value bool) PooledWorkerPoolOptions { @@ -108,3 +112,13 @@ func (o *pooledWorkerPoolOptions) SetInstrumentOptions(value instrument.Options) func (o *pooledWorkerPoolOptions) InstrumentOptions() instrument.Options { return o.iOpts } + +func (o *pooledWorkerPoolOptions) SetInstrumentSampleRate(value sampler.Rate) PooledWorkerPoolOptions { + opts := *o + opts.instrumentSampleRate = value + return &opts +} + +func (o *pooledWorkerPoolOptions) InstrumentSampleRate() sampler.Rate { + return o.instrumentSampleRate +} diff --git a/src/x/sync/pooled_worker_pool.go b/src/x/sync/pooled_worker_pool.go index d65344d237..f25fdbf4d6 100644 --- a/src/x/sync/pooled_worker_pool.go +++ b/src/x/sync/pooled_worker_pool.go @@ -26,14 +26,12 @@ import ( "sync" "sync/atomic" + "github.com/m3db/m3/src/x/sampler" + "github.com/MichaelTJones/pcg" "github.com/uber-go/tally" ) -const ( - numGoroutinesGaugeSampleRate = 1000 -) - type pooledWorkerPool struct { sync.Mutex numRoutinesAtomic int64 @@ -42,9 +40,11 @@ type pooledWorkerPool struct { numWorkingRoutinesGauge tally.Gauge growOnDemand bool workChs []chan Work + samplers []*sampler.Sampler numShards int64 killWorkerProbability float64 nowFn NowFn + sampler *sampler.Sampler } // NewPooledWorkerPool creates a new worker pool. @@ -58,9 +58,18 @@ func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPo numShards = int64(size) } - workChs := make([]chan Work, numShards) - for i := range workChs { - workChs[i] = make(chan Work, int64(size)/numShards) + workChs := make([]chan Work, 0, numShards) + for i := 0; i < int(numShards); i++ { + workChs = append(workChs, make(chan Work, int64(size)/numShards)) + } + + samplers := make([]*sampler.Sampler, 0, numShards) + for i := 0; i < int(numShards); i++ { + sampler, err := sampler.NewSampler(opts.InstrumentSampleRate()) + if err != nil { + return nil, err + } + samplers = append(samplers, sampler) } return &pooledWorkerPool{ @@ -70,6 +79,7 @@ func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPo numWorkingRoutinesGauge: opts.InstrumentOptions().MetricsScope().Gauge("num-working-routines"), growOnDemand: opts.GrowOnDemand(), workChs: workChs, + samplers: samplers, numShards: numShards, killWorkerProbability: opts.KillWorkerProbability(), nowFn: opts.NowFn(), @@ -88,12 +98,14 @@ func (p *pooledWorkerPool) Init() { func (p *pooledWorkerPool) Go(work Work) { var ( // Use time.Now() to avoid excessive synchronization - currTime = p.nowFn().UnixNano() - workChIdx = currTime % p.numShards - workCh = p.workChs[workChIdx] + currTime = p.nowFn().UnixNano() + shardIdx = currTime % p.numShards + workCh = p.workChs[shardIdx] + sampler = p.samplers[shardIdx] ) - if currTime%numGoroutinesGaugeSampleRate == 0 { + // Use per shard sampler for less synchronization overhead. + if sampler.Sample() { p.emitNumRoutines() p.emitNumWorkingRoutines() } diff --git a/src/x/sync/types.go b/src/x/sync/types.go index a19c8a2ca7..9980b031bd 100644 --- a/src/x/sync/types.go +++ b/src/x/sync/types.go @@ -24,6 +24,7 @@ import ( "time" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/sampler" ) // Work is a unit of item to be worked on. @@ -110,4 +111,10 @@ type PooledWorkerPoolOptions interface { // InstrumentOptions returns the now function. InstrumentOptions() instrument.Options + + // SetInstrumentSampleRate sets the metrics sample rate. + SetInstrumentSampleRate(value sampler.Rate) PooledWorkerPoolOptions + + // InstrumentSampleRate returns the metrics sampler rate. + InstrumentSampleRate() sampler.Rate }