Skip to content

Commit 3446a31

Browse files
authored
Pipe: Optimized the path construction efficiency in pattern match (#16265)
* refactpr * fix-optimize * optimie * fix
1 parent 1d5c8e2 commit 3446a31

8 files changed

Lines changed: 143 additions & 119 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
3030
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
3131
import org.apache.iotdb.commons.pipe.config.PipeConfig;
32+
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
3233
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
3334
import org.apache.iotdb.commons.service.IService;
3435
import org.apache.iotdb.commons.service.ServiceType;
@@ -39,6 +40,7 @@
3940
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
4041
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
4142
import org.apache.iotdb.db.service.ResourcesInformationHolder;
43+
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
4244
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
4345

4446
import org.slf4j.Logger;
@@ -69,7 +71,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
6971
//////////////////////////// System Service Interface ////////////////////////////
7072

7173
public synchronized void preparePipeResources(
72-
ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
74+
final ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
7375
// Clean sender (connector) hardlink file dir and snapshot dir
7476
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
7577

@@ -78,6 +80,9 @@ public synchronized void preparePipeResources(
7880

7981
PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
8082
simpleProgressIndexAssigner.start();
83+
84+
IoTDBTreePattern.setDevicePathGetter(CompactionPathUtils::getPath);
85+
IoTDBTreePattern.setMeasurementPathGetter(CompactionPathUtils::getPath);
8186
}
8287

8388
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java

Lines changed: 90 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -58,47 +58,47 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
5858

5959
protected final ReentrantReadWriteLock lock;
6060
private final AccessControl accessControl = Coordinator.getInstance().getAccessControl();
61-
protected final Set<PipeRealtimeDataRegionSource> extractors;
61+
protected final Set<PipeRealtimeDataRegionSource> sources;
6262

63-
protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>> deviceToExtractorsCache;
63+
protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>> deviceToSourcesCache;
6464
protected final Cache<Pair<String, IDeviceID>, Set<PipeRealtimeDataRegionSource>>
65-
databaseAndTableToExtractorsCache;
65+
databaseAndTableToSourcesCache;
6666

6767
public CachedSchemaPatternMatcher() {
6868
this.lock = new ReentrantReadWriteLock();
69-
// Should be thread-safe because the extractors will be returned by {@link #match} and
70-
// iterated by {@link #assignToExtractor}, at the same time the extractors may be added or
69+
// Should be thread-safe because the sources will be returned by {@link #match} and
70+
// iterated by {@link #assignToSource}, at the same time the sources may be added or
7171
// removed by {@link #register} and {@link #deregister}.
72-
this.extractors = new CopyOnWriteArraySet<>();
73-
this.deviceToExtractorsCache =
72+
this.sources = new CopyOnWriteArraySet<>();
73+
this.deviceToSourcesCache =
7474
Caffeine.newBuilder()
75-
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
75+
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
7676
.build();
77-
this.databaseAndTableToExtractorsCache =
77+
this.databaseAndTableToSourcesCache =
7878
Caffeine.newBuilder()
79-
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
79+
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
8080
.build();
8181
}
8282

8383
@Override
84-
public void register(final PipeRealtimeDataRegionSource extractor) {
84+
public void register(final PipeRealtimeDataRegionSource source) {
8585
lock.writeLock().lock();
8686
try {
87-
extractors.add(extractor);
88-
deviceToExtractorsCache.invalidateAll();
89-
databaseAndTableToExtractorsCache.invalidateAll();
87+
sources.add(source);
88+
deviceToSourcesCache.invalidateAll();
89+
databaseAndTableToSourcesCache.invalidateAll();
9090
} finally {
9191
lock.writeLock().unlock();
9292
}
9393
}
9494

9595
@Override
96-
public void deregister(final PipeRealtimeDataRegionSource extractor) {
96+
public void deregister(final PipeRealtimeDataRegionSource source) {
9797
lock.writeLock().lock();
9898
try {
99-
extractors.remove(extractor);
100-
deviceToExtractorsCache.invalidateAll();
101-
databaseAndTableToExtractorsCache.invalidateAll();
99+
sources.remove(source);
100+
deviceToSourcesCache.invalidateAll();
101+
databaseAndTableToSourcesCache.invalidateAll();
102102
} finally {
103103
lock.writeLock().unlock();
104104
}
@@ -109,7 +109,7 @@ public void invalidateCache() {
109109
lock.writeLock().lock();
110110
try {
111111
// Will invalidate device cache
112-
databaseAndTableToExtractorsCache.invalidateAll();
112+
databaseAndTableToSourcesCache.invalidateAll();
113113
} finally {
114114
lock.writeLock().unlock();
115115
}
@@ -119,7 +119,7 @@ public void invalidateCache() {
119119
public int getRegisterCount() {
120120
lock.readLock().lock();
121121
try {
122-
return extractors.size();
122+
return sources.size();
123123
} finally {
124124
lock.readLock().unlock();
125125
}
@@ -128,26 +128,26 @@ public int getRegisterCount() {
128128
@Override
129129
public Pair<Set<PipeRealtimeDataRegionSource>, Set<PipeRealtimeDataRegionSource>> match(
130130
final PipeRealtimeEvent event) {
131-
final Set<PipeRealtimeDataRegionSource> matchedExtractors = new HashSet<>();
131+
final Set<PipeRealtimeDataRegionSource> matchedSources = new HashSet<>();
132132

133133
lock.readLock().lock();
134134
try {
135-
if (extractors.isEmpty()) {
136-
return new Pair<>(matchedExtractors, extractors);
135+
if (sources.isEmpty()) {
136+
return new Pair<>(matchedSources, sources);
137137
}
138138

139-
// HeartbeatEvent will be assigned to all extractors
139+
// HeartbeatEvent will be assigned to all sources
140140
if (event.getEvent() instanceof PipeHeartbeatEvent) {
141-
return new Pair<>(extractors, Collections.EMPTY_SET);
141+
return new Pair<>(sources, Collections.EMPTY_SET);
142142
}
143143

144144
// TODO: consider table pattern?
145-
// Deletion event will be assigned to extractors listened to it
145+
// Deletion event will be assigned to sources listened to it
146146
if (event.getEvent() instanceof PipeDeleteDataNodeEvent) {
147-
extractors.stream()
147+
sources.stream()
148148
.filter(PipeRealtimeDataRegionSource::shouldExtractDeletion)
149-
.forEach(matchedExtractors::add);
150-
return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors));
149+
.forEach(matchedSources::add);
150+
return new Pair<>(matchedSources, findUnmatchedSources(matchedSources));
151151
}
152152

153153
for (final Map.Entry<IDeviceID, String[]> entry : event.getSchemaInfo().entrySet()) {
@@ -158,78 +158,78 @@ public Pair<Set<PipeRealtimeDataRegionSource>, Set<PipeRealtimeDataRegionSource>
158158
|| deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
159159
|| deviceID.getTableName().equals(PATH_ROOT)) {
160160
event.markAsTreeModelEvent();
161-
matchTreeModelEvent(deviceID, entry.getValue(), matchedExtractors);
161+
matchTreeModelEvent(deviceID, entry.getValue(), matchedSources);
162162
} else {
163163
event.markAsTableModelEvent();
164164
matchTableModelEvent(
165165
event.getEvent() instanceof PipeInsertionEvent
166166
? ((PipeInsertionEvent) event.getEvent()).getTableModelDatabaseName()
167167
: null,
168168
deviceID,
169-
matchedExtractors);
169+
matchedSources);
170170
}
171171

172-
if (matchedExtractors.size() == extractors.size()) {
172+
if (matchedSources.size() == sources.size()) {
173173
break;
174174
}
175175
}
176176

177-
return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors));
177+
return new Pair<>(matchedSources, findUnmatchedSources(matchedSources));
178178
} finally {
179179
lock.readLock().unlock();
180180
}
181181
}
182182

183-
private Set<PipeRealtimeDataRegionSource> findUnmatchedExtractors(
184-
final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
185-
final Set<PipeRealtimeDataRegionSource> unmatchedExtractors = new HashSet<>();
186-
for (final PipeRealtimeDataRegionSource extractor : extractors) {
187-
if (!matchedExtractors.contains(extractor)) {
188-
unmatchedExtractors.add(extractor);
183+
private Set<PipeRealtimeDataRegionSource> findUnmatchedSources(
184+
final Set<PipeRealtimeDataRegionSource> matchedSources) {
185+
final Set<PipeRealtimeDataRegionSource> unmatchedSources = new HashSet<>();
186+
for (final PipeRealtimeDataRegionSource source : sources) {
187+
if (!matchedSources.contains(source)) {
188+
unmatchedSources.add(source);
189189
}
190190
}
191-
return unmatchedExtractors;
191+
return unmatchedSources;
192192
}
193193

194194
protected void matchTreeModelEvent(
195195
final IDeviceID device,
196196
final String[] measurements,
197-
final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
198-
// 1. try to get matched extractors from cache, if not success, match them by device
199-
final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDevice =
200-
deviceToExtractorsCache.get(device, this::filterExtractorsByDevice);
197+
final Set<PipeRealtimeDataRegionSource> matchedSources) {
198+
// 1. try to get matched sources from cache, if not success, match them by device
199+
final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
200+
deviceToSourcesCache.get(device, this::filterSourcesByDevice);
201201
// this would not happen
202-
if (extractorsFilteredByDevice == null) {
202+
if (sourcesFilteredByDevice == null) {
203203
LOGGER.warn(
204-
"Extractors filtered by device is null when matching extractors for tree model event.",
204+
"Sources filtered by device is null when matching sources for tree model event.",
205205
new Exception());
206206
return;
207207
}
208208

209-
// 2. filter matched candidate extractors by measurements
209+
// 2. filter matched candidate sources by measurements
210210
if (measurements.length == 0) {
211-
// `measurements` is empty (only in case of tsfile event). match all extractors.
211+
// `measurements` is empty (only in case of tsfile event). match all sources.
212212
//
213213
// case 1: the pattern can match all measurements of the device.
214-
// in this case, the extractor can be matched without checking the measurements.
214+
// in this case, the source can be matched without checking the measurements.
215215
//
216216
// case 2: the pattern may match some measurements of the device.
217217
// in this case, we can't get all measurements efficiently here,
218-
// so we just ASSUME the extractor matches and do more checks later.
219-
matchedExtractors.addAll(extractorsFilteredByDevice);
218+
// so we just ASSUME the source matches and do more checks later.
219+
matchedSources.addAll(sourcesFilteredByDevice);
220220
} else {
221221
// `measurements` is not empty (only in case of tablet event).
222-
// Match extractors by measurements.
223-
extractorsFilteredByDevice.forEach(
224-
extractor -> {
225-
if (matchedExtractors.size() == extractors.size()) {
222+
// Match sources by measurements.
223+
sourcesFilteredByDevice.forEach(
224+
source -> {
225+
if (matchedSources.size() == sources.size()) {
226226
return;
227227
}
228228

229-
final TreePattern pattern = extractor.getTreePattern();
229+
final TreePattern pattern = source.getTreePattern();
230230
if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(device)) {
231231
// The pattern can match all measurements of the device.
232-
matchedExtractors.add(extractor);
232+
matchedSources.add(source);
233233
} else {
234234
for (final String measurement : measurements) {
235235
// Ignore null measurement for partial insert
@@ -238,8 +238,8 @@ protected void matchTreeModelEvent(
238238
}
239239

240240
if (pattern.matchesMeasurement(device, measurement)) {
241-
matchedExtractors.add(extractor);
242-
// There would be no more matched extractors because the measurements are
241+
matchedSources.add(source);
242+
// There would be no more matched sources because the measurements are
243243
// unique
244244
break;
245245
}
@@ -249,69 +249,69 @@ protected void matchTreeModelEvent(
249249
}
250250
}
251251

252-
protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDevice(final IDeviceID device) {
253-
final Set<PipeRealtimeDataRegionSource> filteredExtractors = new HashSet<>();
252+
protected Set<PipeRealtimeDataRegionSource> filterSourcesByDevice(final IDeviceID device) {
253+
final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
254254

255-
for (final PipeRealtimeDataRegionSource extractor : extractors) {
256-
// Return if the extractor only extract deletion
257-
if (!extractor.shouldExtractInsertion()) {
255+
for (final PipeRealtimeDataRegionSource source : sources) {
256+
// Return if the source only extract deletion
257+
if (!source.shouldExtractInsertion()) {
258258
continue;
259259
}
260260

261-
final TreePattern treePattern = extractor.getTreePattern();
261+
final TreePattern treePattern = source.getTreePattern();
262262
if (Objects.isNull(treePattern)
263263
|| (treePattern.isTreeModelDataAllowedToBeCaptured()
264264
&& treePattern.mayOverlapWithDevice(device))) {
265-
filteredExtractors.add(extractor);
265+
filteredSources.add(source);
266266
}
267267
}
268268

269-
return filteredExtractors;
269+
return filteredSources;
270270
}
271271

272272
protected void matchTableModelEvent(
273273
final String databaseName,
274274
final IDeviceID tableName,
275-
final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
275+
final Set<PipeRealtimeDataRegionSource> matchedSources) {
276276
// this would not happen
277277
if (databaseName == null) {
278278
LOGGER.warn(
279-
"Database name is null when matching extractors for table model event.", new Exception());
279+
"Database name is null when matching sources for table model event.", new Exception());
280280
return;
281281
}
282282

283-
final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDatabaseAndTable =
284-
databaseAndTableToExtractorsCache.get(
285-
new Pair<>(databaseName, tableName), this::filterExtractorsByDatabaseAndTable);
283+
final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDatabaseAndTable =
284+
databaseAndTableToSourcesCache.get(
285+
new Pair<>(databaseName, tableName), this::filterSourcesByDatabaseAndTable);
286286
// this would not happen
287-
if (extractorsFilteredByDatabaseAndTable == null) {
287+
if (sourcesFilteredByDatabaseAndTable == null) {
288288
LOGGER.warn(
289-
"Extractors filtered by database and table is null when matching extractors for table model event.",
289+
"Sources filtered by database and table is null when matching sources for table model event.",
290290
new Exception());
291291
return;
292292
}
293-
matchedExtractors.addAll(extractorsFilteredByDatabaseAndTable);
293+
matchedSources.addAll(sourcesFilteredByDatabaseAndTable);
294294
}
295295

296-
protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDatabaseAndTable(
296+
protected Set<PipeRealtimeDataRegionSource> filterSourcesByDatabaseAndTable(
297297
final Pair<String, IDeviceID> databaseNameAndTableName) {
298-
final Set<PipeRealtimeDataRegionSource> filteredExtractors = new HashSet<>();
298+
final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
299299

300-
for (final PipeRealtimeDataRegionSource extractor : extractors) {
301-
// Return if the extractor only extract deletion
302-
if (!extractor.shouldExtractInsertion()) {
300+
for (final PipeRealtimeDataRegionSource source : sources) {
301+
// Return if the source only extract deletion
302+
if (!source.shouldExtractInsertion()) {
303303
continue;
304304
}
305305

306-
final TablePattern tablePattern = extractor.getTablePattern();
306+
final TablePattern tablePattern = source.getTablePattern();
307307
if (matchesTablePattern(tablePattern, databaseNameAndTableName)
308-
&& (!extractor.isSkipIfNoPrivileges()
309-
|| notFilteredByAccess(extractor.getUserName(), databaseNameAndTableName))) {
310-
filteredExtractors.add(extractor);
308+
&& (!source.isSkipIfNoPrivileges()
309+
|| notFilteredByAccess(source.getUserName(), databaseNameAndTableName))) {
310+
filteredSources.add(source);
311311
}
312312
}
313313

314-
return filteredExtractors;
314+
return filteredSources;
315315
}
316316

317317
private boolean matchesTablePattern(
@@ -335,11 +335,11 @@ private boolean notFilteredByAccess(
335335
public void clear() {
336336
lock.writeLock().lock();
337337
try {
338-
extractors.clear();
339-
deviceToExtractorsCache.invalidateAll();
340-
deviceToExtractorsCache.cleanUp();
341-
databaseAndTableToExtractorsCache.invalidateAll();
342-
databaseAndTableToExtractorsCache.cleanUp();
338+
sources.clear();
339+
deviceToSourcesCache.invalidateAll();
340+
deviceToSourcesCache.cleanUp();
341+
databaseAndTableToSourcesCache.invalidateAll();
342+
databaseAndTableToSourcesCache.cleanUp();
343343
} finally {
344344
lock.writeLock().unlock();
345345
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,9 @@
2929
import com.github.benmanes.caffeine.cache.Cache;
3030
import com.github.benmanes.caffeine.cache.Caffeine;
3131
import com.github.benmanes.caffeine.cache.Weigher;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3432

3533
/** This cache is for reducing duplicated DeviceId PartialPath initialization in write process. */
3634
public class DataNodeDevicePathCache {
37-
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeDevicePathCache.class);
3835

3936
private static final DataNodeMemoryConfig memoryConfig =
4037
IoTDBDescriptor.getInstance().getMemoryConfig();

0 commit comments

Comments
 (0)