-
Notifications
You must be signed in to change notification settings - Fork 985
External Sort Operator
The external operator works like other Drill operators in it's use of an incoming batch, the next( ) call hierarchy, and that the operator is a record batch for its "output" (downstream, up-the-call-stack) operator. The external sort operator is interesting for two key reasons:
- It buffers all the incoming rows so that they can be sorted together, and
- It optionally spills data to disk due to memory pressure.
Like all operators, the external sort (xsort) operator does the following:
- Holds an incoming operator (
incoming). - Calls
incoming.next( )to obtain a record batch. - Supports an incoming batch with an associated selection vector. (?)
Like all operators, the xsort operator maintains a set of internal states, and performs state transitions based on responses from the incoming batch.
Incoming events are defined by the RecordBatch.IterOutcome enum:
-
NONE: Normal completion of batch. -
OK: Zero or more records with schema same as the previous batch. -
OK_NEW_SCHEMA: Zero or more records with a schema different than the previous batch. -
STOP: Abnormal termination. The xsort operator simply forwards this event downstream. -
NOT_YET: No data. Not supported by the xsort operator. (Throws anUnsupportedOperationExceptionexception.) -
OUT_OF_MEMORY: Non-fatal OOM. -
Exception: Not a enum event, but all methods can throw an exception.
Sorting is done via an indirection vector. That is, the data in the vectors does not move during the sort. Instead, the xsort operator uses a int-based selection vector (SelectionVector4 sv4) to hold indexes to the data. During sort, the indexes are reordered, not the vector data.
Batches are sorted individually using a SingleBatchSorter instance (sorter). Batches are merged (how?)
The xsort operator does not support schema changes (though some parts of the code seem to be written in anticipation of such support.) The operator ignores "trivial" schema changes (where the new schema is the same as the previous.) However, if the schema really does change, the operator throws a SchemaChangeException.
PriorityQueueCopier
The operator spills to disk (when)? batch group size? threshold?
Disk management is integrated into the xsort operator class itself. The algorithm is:
- Use the HDFS
FileSystemclass, referenced byfsto manage spill files. Allows files to be written into the DFS file system in addition to local disk. This is done when cluster nodes are configured with limited local storage, and so large spill files are written to DFS instead. - A list of spill directories (
dirs) to which spill files are written (in round robin manner?)
Individual spill files have the following name:
<query-id>_majorfragment<major-fragment-id>_minorfragment<minor-fragment-id>_operator<operator-id>
Where the items in <...> are parameters for the particular operator.
-
drill.exec.sort.external.spill.group.size(memberSPILL_BATCH_GROUP_SIZE) -
drill.exec.sort.external.spill.threshold(memberEXTERNAL_SORT_SPILL_THRESHOLD) -
drill.exec.sort.external.spill.directories(memberdirs)
- MappingSet
- Dir algorithm