Skip to content

Commit 822b9d3

Browse files
gbbafnaBukhtawar
andauthored
[Backport 2.x] Repository stats for remote store (opensearch-project#10715)
* Repository stats for remote store (opensearch-project#10567) Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> * Changing version for repo stats blob Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> * spotless fixes Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> * Muting s3 request stats test (opensearch-project#10736) Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> --------- Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> Signed-off-by: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent 4a0b3c3 commit 822b9d3

30 files changed

Lines changed: 479 additions & 204 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- [Admission control] Add Resource usage collector service and resource usage tracker ([#10695](https://github.com/opensearch-project/OpenSearch/pull/10695))
99
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10696](https://github.com/opensearch-project/OpenSearch/pull/10696))
1010
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
11+
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
1112
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
1213
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
1314

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;
3939

4040
import org.opensearch.action.ActionRunnable;
41+
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
4142
import org.opensearch.action.support.PlainActionFuture;
43+
import org.opensearch.cluster.metadata.IndexMetadata;
4244
import org.opensearch.cluster.metadata.RepositoryMetadata;
4345
import org.opensearch.cluster.service.ClusterService;
4446
import org.opensearch.common.SuppressForbidden;
@@ -57,13 +59,17 @@
5759
import org.opensearch.indices.recovery.RecoverySettings;
5860
import org.opensearch.plugins.Plugin;
5961
import org.opensearch.repositories.RepositoriesService;
62+
import org.opensearch.repositories.Repository;
6063
import org.opensearch.repositories.RepositoryData;
64+
import org.opensearch.repositories.RepositoryMissingException;
65+
import org.opensearch.repositories.RepositoryStats;
6166
import org.opensearch.repositories.blobstore.BlobStoreRepository;
6267
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
6368
import org.opensearch.repositories.s3.utils.AwsRequestSigner;
6469
import org.opensearch.snapshots.SnapshotId;
6570
import org.opensearch.snapshots.SnapshotsService;
6671
import org.opensearch.snapshots.mockstore.BlobStoreWrapper;
72+
import org.opensearch.test.BackgroundIndexer;
6773
import org.opensearch.test.OpenSearchIntegTestCase;
6874
import org.opensearch.threadpool.ThreadPool;
6975

@@ -73,12 +79,18 @@
7379
import java.util.ArrayList;
7480
import java.util.Collection;
7581
import java.util.Collections;
82+
import java.util.HashMap;
7683
import java.util.List;
7784
import java.util.Map;
85+
import java.util.Objects;
86+
import java.util.stream.StreamSupport;
7887

7988
import fixture.s3.S3HttpHandler;
8089

90+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
91+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
8192
import static org.hamcrest.Matchers.containsString;
93+
import static org.hamcrest.Matchers.equalTo;
8294
import static org.hamcrest.Matchers.greaterThan;
8395
import static org.hamcrest.Matchers.lessThan;
8496

@@ -216,6 +228,67 @@ public void testEnforcedCooldownPeriod() throws IOException {
216228
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
217229
}
218230

231+
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10735")
232+
@Override
233+
public void testRequestStats() throws Exception {
234+
final String repository = createRepository(randomName());
235+
final String index = "index-no-merges";
236+
createIndex(
237+
index,
238+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
239+
);
240+
241+
final long nbDocs = randomLongBetween(10_000L, 20_000L);
242+
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
243+
waitForDocs(nbDocs, indexer);
244+
}
245+
246+
flushAndRefresh(index);
247+
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
248+
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
249+
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
250+
251+
final String snapshot = "snapshot";
252+
assertSuccessfulSnapshot(
253+
client().admin().cluster().prepareCreateSnapshot(repository, snapshot).setWaitForCompletion(true).setIndices(index)
254+
);
255+
256+
assertAcked(client().admin().indices().prepareDelete(index));
257+
258+
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
259+
ensureGreen(index);
260+
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
261+
262+
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
263+
264+
final RepositoryStats repositoryStats = StreamSupport.stream(
265+
internalCluster().getInstances(RepositoriesService.class).spliterator(),
266+
false
267+
).map(repositoriesService -> {
268+
try {
269+
return repositoriesService.repository(repository);
270+
} catch (RepositoryMissingException e) {
271+
return null;
272+
}
273+
}).filter(Objects::nonNull).map(Repository::stats).reduce(RepositoryStats::merge).get();
274+
275+
Map<BlobStore.Metric, Map<String, Long>> extendedStats = repositoryStats.extendedStats;
276+
Map<String, Long> aggregatedStats = new HashMap<>();
277+
extendedStats.forEach((k, v) -> {
278+
if (k == BlobStore.Metric.RETRY_COUNT || k == BlobStore.Metric.REQUEST_SUCCESS || k == BlobStore.Metric.REQUEST_FAILURE) {
279+
for (Map.Entry<String, Long> entry : v.entrySet()) {
280+
aggregatedStats.merge(entry.getKey(), entry.getValue(), Math::addExact);
281+
}
282+
}
283+
284+
});
285+
final Map<String, Long> mockCalls = getMockRequestCounts();
286+
287+
String assertionErrorMsg = String.format("SDK sent [%s] calls and handler measured [%s] calls", aggregatedStats, mockCalls);
288+
289+
assertEquals(assertionErrorMsg, mockCalls, aggregatedStats);
290+
}
291+
219292
/**
220293
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
221294
*/
@@ -327,6 +400,8 @@ public void maybeTrack(final String request, Headers requestHeaders) {
327400
trackRequest("PutMultipartObject");
328401
} else if (Regex.simpleMatch("PUT /*/*", request)) {
329402
trackRequest("PutObject");
403+
} else if (Regex.simpleMatch("POST /*?delete*", request)) {
404+
trackRequest("DeleteObjects");
330405
}
331406
}
332407

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
199199
? amazonS3Reference.get().priorityClient()
200200
: amazonS3Reference.get().client();
201201
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
202-
.uploadObject(s3AsyncClient, uploadRequest, streamContext);
202+
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
203203
completableFuture.whenComplete((response, throwable) -> {
204204
if (throwable == null) {
205205
completionListener.onResponse(response);
@@ -384,7 +384,7 @@ private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOEx
384384
assert outstanding.isEmpty();
385385
}
386386

387-
private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
387+
private DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
388388
return DeleteObjectsRequest.builder()
389389
.bucket(bucket)
390390
.delete(
@@ -393,6 +393,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
393393
.quiet(true)
394394
.build()
395395
)
396+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
396397
.build();
397398
}
398399

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.opensearch.repositories.s3.async.AsyncTransferManager;
4848

4949
import java.io.IOException;
50+
import java.util.Collections;
51+
import java.util.HashMap;
5052
import java.util.Locale;
5153
import java.util.Map;
5254

@@ -180,6 +182,16 @@ public Map<String, Long> stats() {
180182
return statsMetricPublisher.getStats().toMap();
181183
}
182184

185+
@Override
186+
public Map<Metric, Map<String, Long>> extendedStats() {
187+
if (statsMetricPublisher.getExtendedStats() == null || statsMetricPublisher.getExtendedStats().isEmpty()) {
188+
return Collections.emptyMap();
189+
}
190+
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
191+
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
192+
return extendedStats;
193+
}
194+
183195
public ObjectCannedACL getCannedACL() {
184196
return cannedACL;
185197
}

0 commit comments

Comments
 (0)