Skip to content

External Sort Operator

Paul Rogers edited this page Oct 31, 2016 · 12 revisions

External Sort Operator

The external operator works like other Drill operators in it's use of an incoming batch, the next( ) call hierarchy, and that the operator is a record batch for its "output" (downstream, up-the-call-stack) operator. The external sort operator is interesting for two key reasons:

  1. It buffers all the incoming rows so that they can be sorted together, and
  2. It optionally spills data to disk due to memory pressure.

General Flow

Like all operators, the external sort (xsort) operator does the following:

  • Holds an incoming operator (incoming).
  • Calls incoming.next( ) to obtain a record batch.
  • Supports an incoming batch with an associated selection vector. (?)

States

Like all operators, the xsort operator maintains a set of internal states, and performs state transitions based on responses from the incoming batch.

Incoming events are defined by the RecordBatch.IterOutcome enum:

  • NONE: Normal completion of batch.
  • OK: Zero or more records with schema same as the previous batch.
  • OK_NEW_SCHEMA: Zero or more records with a schema different than the previous batch.
  • STOP: Abnormal termination. The xsort operator simply forwards this event downstream.
  • NOT_YET: No data. Not supported by the xsort operator. (Throws an UnsupportedOperationException exception.)
  • OUT_OF_MEMORY: Non-fatal OOM.
  • Exception: Not a enum event, but all methods can throw an exception.

Sort Implementation

Sorting is done via an indirection vector. That is, the data in the vectors does not move during the sort. Instead, the xsort operator uses a int-based selection vector (SelectionVector4 sv4) to hold indexes to the data. During sort, the indexes are reordered, not the vector data.

Batches are sorted individually using a SingleBatchSorter instance (sorter). Batches are merged (how?)

Schema Change Handling

The xsort operator does not support schema changes (though some parts of the code seem to be written in anticipation of such support.) The operator ignores "trivial" schema changes (where the new schema is the same as the previous.) However, if the schema really does change, the operator throws a SchemaChangeException.

PriorityQueueCopier

Spilling

The operator spills to disk (when)? batch group size? threshold?

Memory Management

Disk Management

Disk management is integrated into the xsort operator class itself. The algorithm is:

  • Use the HDFS FileSystem class, referenced by fs to manage spill files. Allows files to be written into the DFS file system in addition to local disk. This is done when cluster nodes are configured with limited local storage, and so large spill files are written to DFS instead.
  • A list of spill directories (dirs) to which spill files are written (in round robin manner?)

Individual spill files have the following name:

<query-id>_majorfragment<major-fragment-id>_minorfragment<minor-fragment-id>_operator<operator-id>

Where the items in <...> are parameters for the particular operator.

Inputs

Configuration Properties

  • drill.exec.sort.external.spill.group.size (member SPILL_BATCH_GROUP_SIZE)
  • drill.exec.sort.external.spill.threshold (member EXTERNAL_SORT_SPILL_THRESHOLD)
  • drill.exec.sort.external.spill.directories (member dirs)

Open Questions

  • MappingSet
  • Dir algorithm

Clone this wiki locally