Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;

/**
* Tracks table caches updated by the current thread.
*/
public final class TableCacheUpdateTracker {

private static final ThreadLocal<Tracker> CURRENT_TRACKER =
new ThreadLocal<>();
private static final Set<String> TRACKING = Collections.emptySet();

private TableCacheUpdateTracker() {
}

public static Tracker track() {
Tracker tracker = new Tracker(CURRENT_TRACKER.get());
CURRENT_TRACKER.set(tracker);
return tracker;
}

public static void recordCacheUpdate(String tableName) {
Tracker tracker = CURRENT_TRACKER.get();
if (tracker != null) {
tracker.recordCacheUpdate(tableName);
}
}

/**
* Tracks updated tables until the scope is closed.
*/
public static final class Tracker implements AutoCloseable {
private final Tracker parent;
private Set<String> tables = TRACKING;
private boolean closed;

private Tracker(Tracker parent) {
this.parent = parent;
}

public Set<String> getUpdatedTables() {
if (tables == TRACKING || tables.isEmpty()) {
return Collections.emptySet();
}
return Collections.unmodifiableSet(new LinkedHashSet<>(tables));
}

@Override
public void close() {
if (!closed) {
Tracker activeParent = getActiveParent();
if (activeParent != null) {
activeParent.addTables(tables);
}
if (CURRENT_TRACKER.get() == this) {
if (activeParent != null) {
CURRENT_TRACKER.set(activeParent);
} else {
CURRENT_TRACKER.remove();
}
}
closed = true;
}
}

private void recordCacheUpdate(String tableName) {
if (!closed && tableName != null && !tableName.isEmpty()) {
if (tables == TRACKING) {
tables = new LinkedHashSet<>();
}
tables.add(tableName);
}
}

private Tracker getActiveParent() {
Tracker current = parent;
while (current != null && current.closed) {
current = current.parent;
}
return current;
}

private void addTables(Set<String> tableNames) {
if (!closed && tableNames != TRACKING && !tableNames.isEmpty()) {
if (tables == TRACKING) {
tables = new LinkedHashSet<>();
}
tables.addAll(tableNames);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ public long getEstimatedKeyCount() throws RocksDatabaseException {
public void addCacheEntry(CacheKey<KEY> cacheKey,
CacheValue<VALUE> cacheValue) {
// This will override the entry if there is already entry for this key.
TableCacheUpdateTracker.recordCacheUpdate(getName());
cache.put(cacheKey, cacheValue);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

/**
* Tests {@link TableCacheUpdateTracker}.
*/
public class TestTableCacheUpdateTracker {

@Test
public void trackReturnsScopedTrackerWithUpdatedTables() {
try (TableCacheUpdateTracker.Tracker tracker =
TableCacheUpdateTracker.track()) {
TableCacheUpdateTracker.recordCacheUpdate("table1");
TableCacheUpdateTracker.recordCacheUpdate("table2");

assertThat(tracker.getUpdatedTables())
.containsExactly("table1", "table2");
}
}

@Test
public void closedTrackerStopsRecordingUpdates() {
TableCacheUpdateTracker.Tracker tracker =
TableCacheUpdateTracker.track();
TableCacheUpdateTracker.recordCacheUpdate("table1");

tracker.close();
TableCacheUpdateTracker.recordCacheUpdate("table2");

assertThat(tracker.getUpdatedTables()).containsExactly("table1");
}

@Test
public void nestedTrackerMergesUpdatesIntoParent() {
try (TableCacheUpdateTracker.Tracker parent =
TableCacheUpdateTracker.track()) {
TableCacheUpdateTracker.recordCacheUpdate("table1");

try (TableCacheUpdateTracker.Tracker child =
TableCacheUpdateTracker.track()) {
TableCacheUpdateTracker.recordCacheUpdate("table2");

assertThat(child.getUpdatedTables()).containsExactly("table2");
}

TableCacheUpdateTracker.recordCacheUpdate("table3");

assertThat(parent.getUpdatedTables())
.containsExactly("table1", "table2", "table3");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Testing OM DB Size Reduction After Compaction

${size_before_compaction} = Get OM DB SST Files Size

Compact OM DB Column Family keyTable
Compact OM DB Column Family fileTable
Compact OM DB Column Family deletedTable
Compact OM DB Column Family deletedDirectoryTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdds.utils.db.TableCacheUpdateTracker;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,6 +102,8 @@ default boolean isBatchSupported() {
default void updateCache(String accessId, S3SecretValue secret) {
S3SecretCache cache = cache();
if (cache != null) {
TableCacheUpdateTracker.recordCacheUpdate(
OMDBDefinition.S3_SECRET_TABLE_DEF.getName());
LOG.info("Updating cache for accessId/user: {}.", accessId);
cache.put(accessId, secret);
}
Expand All @@ -108,6 +112,8 @@ default void updateCache(String accessId, S3SecretValue secret) {
default void invalidateCacheEntry(String id) {
S3SecretCache cache = cache();
if (cache != null) {
TableCacheUpdateTracker.recordCacheUpdate(
OMDBDefinition.S3_SECRET_TABLE_DEF.getName());
cache.invalidate(id);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -44,7 +43,6 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.S3SecretManager;
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
Expand Down Expand Up @@ -473,26 +471,9 @@ private static boolean isStandaloneBatchCmdTypes(OMResponse response) {
}

private void addCleanupEntry(Entry entry, Map<String, List<Long>> cleanupEpochs) {
Class<? extends OMClientResponse> responseClass =
entry.getResponse().getClass();
CleanupTableInfo cleanupTableInfo =
responseClass.getAnnotation(CleanupTableInfo.class);
if (cleanupTableInfo != null) {
final List<String> cleanupTables;
if (cleanupTableInfo.cleanupAll()) {
cleanupTables = OMDBDefinition.get().getColumnFamilyNames();
} else {
cleanupTables = Arrays.asList(cleanupTableInfo.cleanupTables());
}
for (String table : cleanupTables) {
cleanupEpochs.computeIfAbsent(table, list -> new ArrayList<>())
.add(entry.getTermIndex().getIndex());
}
} else {
// This is to catch early errors, when a new response class missed to
// add CleanupTableInfo annotation.
throw new RuntimeException("CleanupTableInfo Annotation is missing " +
"for" + responseClass);
for (String table : entry.getResponse().getCleanupTables()) {
cleanupEpochs.computeIfAbsent(table, list -> new ArrayList<>())
.add(entry.getTermIndex().getIndex());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.TableCacheUpdateTracker;
import org.apache.hadoop.hdds.utils.db.TableCacheUpdateTracker.Tracker;
import org.apache.hadoop.ipc_.ProtobufRpcEngine;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -149,7 +151,11 @@ public void handleRequestFailure(OzoneManager ozoneManager) {
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, long transactionLogIndex) {
ExecutionContext context = ExecutionContext.of(transactionLogIndex,
TransactionInfo.getTermIndex(transactionLogIndex));
return validateAndUpdateCache(ozoneManager, context);
try (Tracker tracker = TableCacheUpdateTracker.track()) {
OMClientResponse response = validateAndUpdateCache(ozoneManager, context);
response.addCleanupTables(tracker.getUpdatedTables());
return response;
}
}

@VisibleForTesting
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
/**
* A dummy OMClientResponse implementation.
*/
@CleanupTableInfo
public class DummyOMClientResponse extends OMClientResponse {

public DummyOMClientResponse(@Nonnull OMResponse omResponse) {
Expand Down
Loading