Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;

public class DeviceSchemaInfo {

private PartialPath devicePath;
private boolean isAligned;
private List<IMeasurementSchemaInfo> measurementSchemaInfoList;
private int templateId = NON_TEMPLATE;

private DeviceSchemaInfo() {}
private final PartialPath devicePath;
private final boolean isAligned;
private final List<IMeasurementSchemaInfo> measurementSchemaInfoList;
private final int templateId;

public DeviceSchemaInfo(
PartialPath devicePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ private long evictOneCacheEntry() {
final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(belongedGroup.getFirstKey());
if (Objects.nonNull(cacheEntryGroup) && cacheEntryGroup.isEmpty()) {
// The removal is non-atomic, but it's ok because it's just a cache and does not affect the
// consistency if you evicts some entries being added
if (Objects.nonNull(firstKeyMap.remove(belongedGroup.getFirstKey()))) {
memory +=
sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public ClusterSchemaTree fetchSchema(
Set<String> storageGroupSet = new HashSet<>();
if (!explicitDevicePatternList.isEmpty()) {
for (PartialPath explicitDevicePattern : explicitDevicePatternList) {
cachedSchema = schemaCache.getMatchedSchemaWithTemplate(explicitDevicePattern);
cachedSchema = schemaCache.getMatchedTemplateSchema(explicitDevicePattern);
if (cachedSchema.isEmpty()) {
isAllCached = false;
break;
Expand All @@ -139,7 +139,7 @@ public ClusterSchemaTree fetchSchema(
continue;
}
cachedSchema =
schemaCache.getMatchedSchemaWithoutTemplate(new MeasurementPath(fullPath.getNodes()));
schemaCache.getMatchedNormalSchema(new MeasurementPath(fullPath.getNodes()));
if (cachedSchema.isEmpty()) {
isAllCached = false;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2797,12 +2797,11 @@ private UpdateLastCacheOperator createUpdateLastCacheOperator(
final boolean isNeedUpdateLastCache = context.isNeedUpdateLastCache();
if (isNeedUpdateLastCache) {
TreeDeviceSchemaCacheManager.getInstance()
.updateLastCache(
.declareLastCache(
((DataDriverContext) operatorContext.getDriverContext())
.getDataRegion()
.getDatabaseName(),
fullPath,
false);
fullPath);
}

return Objects.isNull(node.getOutputViewPath())
Expand Down Expand Up @@ -2854,13 +2853,12 @@ private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator(

for (int i = 0; i < size; ++i) {
TreeDeviceSchemaCacheManager.getInstance()
.updateLastCache(
.declareLastCache(
databaseName,
new MeasurementPath(
devicePath.concatNode(unCachedPath.getMeasurementList().get(i)),
unCachedPath.getSchemaList().get(i),
true),
false);
true));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ public TSDataType getDataType() {

private static final Optional<Pair<OptionalLong, TsPrimitiveType[]>> HIT_AND_ALL_NULL =
Optional.of(new Pair<>(OptionalLong.empty(), null));

/** This means that the tv pair has been put, and the value is null */
public static final TimeValuePair EMPTY_TIME_VALUE_PAIR =
new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);

/** This means that the tv pair has been declared, and is ready for the next put. */
private static final TimeValuePair PLACEHOLDER_TIME_VALUE_PAIR =
new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ public ClusterSchemaTree get(final PartialPath devicePath, final String[] measur
* @param devicePath full path of the device
* @return empty if cache miss or the device path is not a template activated path
*/
public ClusterSchemaTree getMatchedSchemaWithTemplate(final PartialPath devicePath) {
public ClusterSchemaTree getMatchedTemplateSchema(final PartialPath devicePath) {
final ClusterSchemaTree tree = new ClusterSchemaTree();
final IDeviceSchema schema = tableDeviceSchemaCache.getDeviceSchema(devicePath.getNodes());
if (!(schema instanceof TreeDeviceTemplateSchema)) {
return tree;
}
final TreeDeviceTemplateSchema treeSchema = (TreeDeviceTemplateSchema) schema;
Template template = templateManager.getTemplate(treeSchema.getTemplateId());
final Template template = templateManager.getTemplate(treeSchema.getTemplateId());
tree.appendTemplateDevice(devicePath, template.isDirectAligned(), template.getId(), template);
tree.setDatabases(Collections.singleton(treeSchema.getDatabase()));
return tree;
Expand All @@ -147,7 +147,7 @@ public ClusterSchemaTree getMatchedSchemaWithTemplate(final PartialPath devicePa
* @param fullPath full path
* @return empty if cache miss
*/
public ClusterSchemaTree getMatchedSchemaWithoutTemplate(final PartialPath fullPath) {
public ClusterSchemaTree getMatchedNormalSchema(final PartialPath fullPath) {
final ClusterSchemaTree tree = new ClusterSchemaTree();
final IDeviceSchema schema =
tableDeviceSchemaCache.getDeviceSchema(
Expand Down Expand Up @@ -283,60 +283,7 @@ public List<Integer> computeWithTemplate(final ISchemaComputation computation) {
continue;
}
final IMeasurementSchema schema = templateSchema.get(measurements[i]);
computation.computeMeasurement(
i,
new IMeasurementSchemaInfo() {
@Override
public String getName() {
return schema.getMeasurementName();
}

@Override
public IMeasurementSchema getSchema() {
if (isLogicalView()) {
return new LogicalViewSchema(
schema.getMeasurementName(), ((LogicalViewSchema) schema).getExpression());
} else {
return this.getSchemaAsMeasurementSchema();
}
}

@Override
public MeasurementSchema getSchemaAsMeasurementSchema() {
return new MeasurementSchema(
schema.getMeasurementName(),
schema.getType(),
schema.getEncodingType(),
schema.getCompressor());
}

@Override
public LogicalViewSchema getSchemaAsLogicalViewSchema() {
throw new RuntimeException(
new UnsupportedOperationException(
"Function getSchemaAsLogicalViewSchema is not supported in DeviceUsingTemplateSchemaCache."));
}

@Override
public Map<String, String> getTagMap() {
return null;
}

@Override
public Map<String, String> getAttributeMap() {
return null;
}

@Override
public String getAlias() {
return null;
}

@Override
public boolean isLogicalView() {
return schema.isLogicalView();
}
});
computation.computeMeasurement(i, new WrappedSchemaInfo(schema));
}
return indexOfMissingMeasurements;
}
Expand Down Expand Up @@ -399,33 +346,46 @@ public void updateLastCacheIfExists(
*
* <p>Note: The query shall put the {@link TableDeviceLastCache} twice:
*
* <p>- First time set the "isCommit" to {@code false} before the query accesses data. It is just
* to allow the writing to update the cache, then avoid that the query put a stale value to cache
* and break the consistency. WARNING: The writing may temporarily put a stale value in cache if a
* stale value is written, but it won't affect the eventual consistency.
* <p>- First time call this before the query accesses data. It is just to allow the writing to
* update the cache, then avoid that the query put a stale value to cache and break the
* consistency. WARNING: The writing may temporarily put a stale value in cache if a stale value
* is written, but it won't affect the eventual consistency.
*
* <p>- Second time put the calculated {@link TimeValuePair}, and use {@link
* #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[], boolean,
* IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or contain {@code null},
* if the measurement is with all {@code null}s, its {@link TimeValuePair} shall be {@link
* TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed to update time column.
*
* @param database the device's database, WITH "root"
* @param measurementPath the fetched {@link MeasurementPath}
*/
public void declareLastCache(final String database, final MeasurementPath measurementPath) {
tableDeviceSchemaCache.updateLastCache(
database,
measurementPath.getIDeviceID(),
new String[] {measurementPath.getMeasurement()},
null,
measurementPath.isUnderAlignedEntity(),
new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
true);
}

/**
* Update the {@link TableDeviceLastCache} on query in tree model.
*
* <p>If the query has ended abnormally, it shall call this to invalidate the entry it has pushed
* in the first time, to avoid the stale writing damaging the eventual consistency. In this case
* and the "isInvalidate" shall be {@code true}.
* in the first time, to avoid the stale writing damaging the eventual consistency.
*
* @param database the device's database, WITH "root"
* @param measurementPath the fetched {@link MeasurementPath}
* @param isInvalidate {@code true} if invalidate the first pushed cache, or {@code null} for the
* first fetch.
*/
public void updateLastCache(
final String database, final MeasurementPath measurementPath, final boolean isInvalidate) {
public void invalidateLastCache(final String database, final MeasurementPath measurementPath) {
tableDeviceSchemaCache.updateLastCache(
database,
measurementPath.getIDeviceID(),
new String[] {measurementPath.getMeasurement()},
isInvalidate ? new TimeValuePair[] {null} : null,
new TimeValuePair[] {null},
measurementPath.isUnderAlignedEntity(),
new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
true);
Expand All @@ -446,4 +406,63 @@ public void invalidate(final List<MeasurementPath> partialPathList) {
public void cleanUp() {
tableDeviceSchemaCache.invalidateAll();
}

private static class WrappedSchemaInfo implements IMeasurementSchemaInfo {
private final IMeasurementSchema schema;

public WrappedSchemaInfo(final IMeasurementSchema schema) {
this.schema = schema;
}

@Override
public String getName() {
return schema.getMeasurementName();
}

@Override
public IMeasurementSchema getSchema() {
if (isLogicalView()) {
return new LogicalViewSchema(
schema.getMeasurementName(), ((LogicalViewSchema) schema).getExpression());
} else {
return this.getSchemaAsMeasurementSchema();
}
}

@Override
public MeasurementSchema getSchemaAsMeasurementSchema() {
return new MeasurementSchema(
schema.getMeasurementName(),
schema.getType(),
schema.getEncodingType(),
schema.getCompressor());
}

@Override
public LogicalViewSchema getSchemaAsLogicalViewSchema() {
throw new RuntimeException(
new UnsupportedOperationException(
"Function getSchemaAsLogicalViewSchema is not supported in DeviceUsingTemplateSchemaCache."));
}

@Override
public Map<String, String> getTagMap() {
return null;
}

@Override
public Map<String, String> getAttributeMap() {
return null;
}

@Override
public String getAlias() {
return null;
}

@Override
public boolean isLogicalView() {
return schema.isLogicalView();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ public void testUpdateLastCache() throws IllegalPathException {

final TimeValuePair tv1 = new TimeValuePair(1, new TsPrimitiveType.TsInt(1));

treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s1"), s1), false);
treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s3"), s3), false);
treeDeviceSchemaCacheManager.declareLastCache(
database, new MeasurementPath(device.concatNode("s1"), s1));
treeDeviceSchemaCacheManager.declareLastCache(
database, new MeasurementPath(device.concatNode("s3"), s3));

// Simulate "s1" revert when the query has failed in calculation
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
Expand All @@ -213,8 +213,8 @@ public void testUpdateLastCache() throws IllegalPathException {
},
false,
new MeasurementSchema[] {s1});
treeDeviceSchemaCacheManager.updateLastCache(
database, new MeasurementPath(device.concatNode("s1"), s1), true);
treeDeviceSchemaCacheManager.invalidateLastCache(
database, new MeasurementPath(device.concatNode("s1"), s1));

// "s2" shall be null since the "null" timeValuePair has not been put
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
Expand Down Expand Up @@ -294,12 +294,11 @@ public void testPut() throws Exception {
new MeasurementPath("root.sg1.d3.s1", TSDataType.FLOAT));
treeDeviceSchemaCacheManager.put(clusterSchemaTree);
final ClusterSchemaTree d1Tree =
treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new PartialPath("root.sg1.d1"));
treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new PartialPath("root.sg1.d1"));
final ClusterSchemaTree d2Tree =
treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new PartialPath("root.sg1.d2"));
treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new PartialPath("root.sg1.d2"));
final ClusterSchemaTree d3Tree =
treeDeviceSchemaCacheManager.getMatchedSchemaWithoutTemplate(
new MeasurementPath("root.sg1.d3.s1"));
treeDeviceSchemaCacheManager.getMatchedNormalSchema(new MeasurementPath("root.sg1.d3.s1"));
List<MeasurementPath> measurementPaths = d1Tree.searchMeasurementPaths(ALL_MATCH_PATTERN).left;
Assert.assertEquals(2, measurementPaths.size());
for (final MeasurementPath measurementPath : measurementPaths) {
Expand Down
Loading