Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.schedule.queue;
package org.apache.iotdb.calc.execution.schedule.queue;

/** A simple interface to indicate the id type. */
public interface ID {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.schedule.queue;
package org.apache.iotdb.calc.execution.schedule.queue;

/**
* A simple interface for id getter and setter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.schedule.queue;
package org.apache.iotdb.calc.execution.schedule.queue;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -166,7 +166,7 @@ public final synchronized int size() {
*
* @return true if the queue is empty, otherwise false.
*/
protected abstract boolean isEmpty();
public abstract boolean isEmpty();

/**
* Get and remove the first element.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.schedule.queue;
package org.apache.iotdb.calc.execution.schedule.queue;

import com.google.common.base.Preconditions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,15 @@ protected abstract void writeToColumnBuilder(
protected void checkType() {
// do nothing
}

@Override
public void close() {
super.close();
for (Pair<ColumnTransformer, ColumnTransformer> whenThenColumnTransformer :
whenThenTransformers) {
whenThenColumnTransformer.left.close();
whenThenColumnTransformer.right.close();
}
elseTransformer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,11 @@ public void clearCache() {
this.leftTransformer.clearCache();
this.rightTransformer.clearCache();
}

@Override
public void close() {
super.close();
this.leftTransformer.close();
this.rightTransformer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public ColumnTransformer[] getInputColumnTransformers() {

@Override
public void close() {
super.close();
for (ColumnTransformer inputColumnTransformer : inputColumnTransformers) {
inputColumnTransformer.close();
}
// finalize executor
executor.beforeDestroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,12 @@ public void clearCache() {
columnTransformer.clearCache();
}
}

@Override
public void close() {
super.close();
for (ColumnTransformer columnTransformer : columnTransformerList) {
columnTransformer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,12 @@ public void clearCache() {
secondColumnTransformer.clearCache();
thirdColumnTransformer.clearCache();
}

@Override
public void close() {
super.close();
firstColumnTransformer.close();
secondColumnTransformer.close();
thirdColumnTransformer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,10 @@ public void clearCache() {
super.clearCache();
childColumnTransformer.clearCache();
}

@Override
public void close() {
super.close();
childColumnTransformer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.SerializableEvent;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;

import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
Expand All @@ -51,7 +52,6 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
Expand All @@ -47,7 +48,6 @@
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
Expand All @@ -32,7 +33,6 @@
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import org.apache.iotdb.commons.audit.IAuditEntity;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceAttributeUpdateNode;
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@

import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.confignode.rpc.thrift.TTableInfo;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;

import org.apache.thrift.TException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue.TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG;
import static org.apache.iotdb.calc.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
import static org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue.TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG;
import static org.apache.iotdb.rpc.TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR;

@SuppressWarnings("squid:S6548")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.TableAggregator;
import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;

import org.apache.tsfile.block.column.ColumnBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.utils.SetThreadName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingReserveQueue;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.StartupException;
Expand All @@ -37,8 +39,6 @@
import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
import org.apache.iotdb.db.queryengine.execution.exchange.IMPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingReserveQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.L1PriorityQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.multilevelqueue.DriverTaskHandle;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.multilevelqueue.MultilevelPriorityQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.multilevelqueue.MultilevelPriorityQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.utils.SetThreadName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.commons.exception.QueryTimeoutException;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.apache.iotdb.db.queryengine.execution.schedule.queue;

import org.apache.iotdb.calc.execution.schedule.queue.ID;
import org.apache.iotdb.calc.execution.schedule.queue.IDIndexedAccessible;
import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -58,7 +62,7 @@ public L1PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder)
}

@Override
protected boolean isEmpty() {
public boolean isEmpty() {
return keyedElements.isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.queryengine.execution.schedule.queue;

import org.apache.iotdb.calc.execution.schedule.queue.IndexedBlockingQueue;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -33,18 +35,22 @@
* <p>The time complexity of operations are:
*
* <ul>
* <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
* <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
* <li><b>{@link #remove(org.apache.iotdb.calc.execution.schedule.queue.IDIndexedAccessible)} ()}:
* </b> O(logN).
* <li><b>{@link #push(org.apache.iotdb.calc.execution.schedule.queue.IDIndexedAccessible)}: </b>
* O(logN).
* <li><b>{@link #poll()}: </b> O(logN).
* <li><b>{@link #get(ID)}}: </b> O(1).
* <li><b>{@link #get(org.apache.iotdb.calc.execution.schedule.queue.ID)}}: </b> O(1).
* </ul>
*/
public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
public class L2PriorityQueue<
E extends org.apache.iotdb.calc.execution.schedule.queue.IDIndexedAccessible>
extends IndexedBlockingQueue<E> {

private SortedSet<E> workingSortedElements;
private SortedSet<E> idleSortedElements;
private Map<ID, E> workingKeyedElements;
private Map<ID, E> idleKeyedElements;
private Map<org.apache.iotdb.calc.execution.schedule.queue.ID, E> workingKeyedElements;
private Map<org.apache.iotdb.calc.execution.schedule.queue.ID, E> idleKeyedElements;

/**
* Init the queue with max capacity and specified comparator.
Expand All @@ -64,15 +70,15 @@ public L2PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder)
}

@Override
protected boolean isEmpty() {
public boolean isEmpty() {
return workingKeyedElements.isEmpty() && idleKeyedElements.isEmpty();
}

@Override
protected E pollFirst() {
if (workingKeyedElements.isEmpty()) {
// Switch the two queues
Map<ID, E> tmp = workingKeyedElements;
Map<org.apache.iotdb.calc.execution.schedule.queue.ID, E> tmp = workingKeyedElements;
workingKeyedElements = idleKeyedElements;
idleKeyedElements = tmp;
SortedSet<E> tmpSet = workingSortedElements;
Expand Down
Loading
Loading