diff --git a/package.json b/package.json index 60c2c8ec..0de57318 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "watch:url": "NODE_ENV=development nodemon bin/cli.js https://hyperparam.blob.core.windows.net/hyperparam/starcoderdata-js-00000-of-00065.parquet" }, "dependencies": { - "hightable": "0.12.1", + "hightable": "0.13.1", "hyparquet": "1.9.1", "hyparquet-compressors": "1.0.0", "icebird": "0.1.8", diff --git a/src/components/Cell.tsx b/src/components/Cell.tsx index ecfaa479..08b4de99 100644 --- a/src/components/Cell.tsx +++ b/src/components/Cell.tsx @@ -1,3 +1,4 @@ +import { stringify } from 'hightable' import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet' import { useEffect, useState } from 'react' import type { FileSource } from '../lib/sources/types.js' @@ -75,30 +76,3 @@ export default function CellView({ source, row, col, config }: CellProps) { ) } - -/** - * Robust stringification of any value, including json and bigints. - */ -function stringify(value: unknown): string { - if (typeof value === 'string') return value - if (typeof value === 'number') return value.toLocaleString('en-US') - if (Array.isArray(value)) { - return `[\n${value.map((v) => indent(stringify(v), 2)).join(',\n')}\n]` - } - if (value === null || value === undefined) return JSON.stringify(value) - if (value instanceof Date) return value.toISOString() - if (typeof value === 'object') { - return `{${Object.entries(value) - .filter((d) => d[1] !== undefined) - .map(([k, v]) => `${k}: ${stringify(v)}`) - .join(', ')}}` - } - return '{}' -} - -function indent(text: string | undefined, spaces: number) { - return text - ?.split('\n') - .map((line) => ' '.repeat(spaces) + line) - .join('\n') -} diff --git a/src/components/viewers/CellPanel.tsx b/src/components/viewers/CellPanel.tsx index dc2dbcee..10b3f0b6 100644 --- a/src/components/viewers/CellPanel.tsx +++ b/src/components/viewers/CellPanel.tsx @@ -36,8 +36,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose if (asyncCell === undefined) { throw new Error(`Cell missing at column ${columnName}`) } - /* TODO(SL): use the same implementation of stringify, here and in Cell.tsx */ - const text = await asyncCell.then(cell => stringify(cell as unknown) ?? '{}') + const text = await asyncCell.then(stringify) setText(text) } catch (error) { setError(error as Error) diff --git a/src/components/viewers/ParquetView.tsx b/src/components/viewers/ParquetView.tsx index 3ed5e759..558eb84d 100644 --- a/src/components/viewers/ParquetView.tsx +++ b/src/components/viewers/ParquetView.tsx @@ -1,9 +1,10 @@ -import HighTable, { DataFrame, rowCache } from 'hightable' +import HighTable, { DataFrame, rowCache, stringify } from 'hightable' import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet' import React, { useCallback, useEffect, useState } from 'react' import { RoutesConfig, appendSearchParams } from '../../lib/routes.js' import { FileSource } from '../../lib/sources/types.js' import { parquetDataFrame } from '../../lib/tableProvider.js' +import styles from '../../styles/ParquetView.module.css' import { Spinner } from '../Layout.js' import CellPanel from './CellPanel.js' import ContentHeader, { ContentSize } from './ContentHeader.js' @@ -100,7 +101,10 @@ export default function ParquetView({ source, setProgress, setError, config }: V data={content.dataframe} onDoubleClickCell={onDoubleClickCell} onMouseDownCell={onMouseDownCell} - onError={setError} />} + onError={setError} + className={styles.hightable} + stringify={stringify} + />} {isLoading &&
} diff --git a/src/lib/getParquetColumn.ts b/src/lib/getParquetColumn.ts new file mode 100644 index 00000000..5d7028c0 --- /dev/null +++ b/src/lib/getParquetColumn.ts @@ -0,0 +1,65 @@ +import { ColumnData, ParquetReadOptions, parquetRead } from 'hyparquet' + +type GetColumnOptions = Omit & {column: string} + +export async function getParquetColumn({ metadata, file, column, compressors }: GetColumnOptions): Promise { + const numRows = Number(metadata?.num_rows) + if (isNaN(numRows)) { + throw new Error('metadata.num_rows is undefined') + } + if (numRows === 0) { + return [] + } + const lastError: {error?: Error} = {} + const values: unknown[] = Array(numRows).fill(undefined) + const ranges: [number, number][] = [] + function onChunk({ columnName, columnData, rowStart, rowEnd }: ColumnData) { + if (columnName !== column) { + lastError.error = new Error(`unexpected column name ${columnName}`) + } + for (let i = rowStart; i < rowEnd; i++) { + values[i] = columnData[i - rowStart] + } + ranges.push([rowStart, rowEnd]) + } + + // this awaits all the promises. When it returns, all the data should have already been sent using onChunk + await parquetRead({ metadata, file, columns: [column], compressors, onChunk }) + + // Do some checks before returning the data + + // check for errors + if (lastError.error !== undefined) { + throw lastError.error + } + + // check for missing data (should be faster than checking for undefined values in the array) + const sortedRanges = ranges.sort((a, b) => a[0] - b[0]) + for (let i = 0; i < sortedRanges.length - 1; i++) { + const range = sortedRanges[i] + const nextRange = sortedRanges[i + 1] + if (!range || !nextRange) { + throw new Error('The ranges should not be undefined') + } + if (range[1] !== nextRange[0]) { + throw new Error(`missing data between rows ${range[1]} and ${nextRange[0]}`) + } + } + const firstRange = sortedRanges[0] + if (!firstRange) { + throw new Error('The first range should not be undefined') + } + if (firstRange[0] !== 0) { + throw new Error(`missing data before row ${firstRange[0]}`) + } + const lastRange = sortedRanges[sortedRanges.length - 1] + if (!lastRange) { + throw new Error('The last range should not be undefined') + } + if (lastRange[1] !== numRows) { + throw new Error(`missing data after row ${lastRange[1]}`) + } + + // return the values + return values +} diff --git a/src/lib/index.ts b/src/lib/index.ts index 13bfa866..58cfe589 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -4,4 +4,4 @@ export * from './sources/index.js' export { parquetDataFrame } from './tableProvider.js' export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js' export { parquetQueryWorker } from './workers/parquetWorkerClient.js' -export type { AsyncBufferFrom, Row } from './workers/types.js' +export type { AsyncBufferFrom, Cells } from './workers/types.js' diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index cae5e423..77430810 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -1,8 +1,33 @@ -import { DataFrame, ResolvableRow, resolvableRow } from 'hightable' +import { DataFrame, OrderBy, ResolvableRow, resolvableRow } from 'hightable' import { FileMetaData, parquetSchema } from 'hyparquet' -import { parquetQueryWorker, parquetSortIndexWorker } from './workers/parquetWorkerClient.js' +import { parquetColumnRanksWorker, parquetQueryWorker } from './workers/parquetWorkerClient.js' import type { AsyncBufferFrom } from './workers/types.d.ts' +/* + * sortIndex[0] gives the index of the first row in the sorted table + */ +export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'descending', ranks: number[] }[]): number[] { + if (!(0 in orderByRanks)) { + throw new Error('orderByRanks should have at least one element') + } + const numRows = orderByRanks[0].ranks.length + return Array + .from({ length: numRows }, (_, i) => i) + .sort((a, b) => { + for (const { direction, ranks } of orderByRanks) { + const rankA = ranks[a] + const rankB = ranks[b] + if (rankA === undefined || rankB === undefined) { + throw new Error('Invalid ranks') + } + const value = direction === 'ascending' ? 1 : -1 + if (rankA < rankB) return -value + if (rankA > rankB) return value + } + return 0 + }) +} + /** * Convert a parquet file into a dataframe. */ @@ -10,6 +35,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): const { children } = parquetSchema(metadata) const header = children.map(child => child.element.name) const sortCache = new Map>() + const columnRanksCache = new Map>() const data = new Array(Number(metadata.num_rows)) const groups = new Array(metadata.row_groups.length).fill(false) let groupStart = 0 @@ -34,7 +60,8 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): throw new Error(`Missing data row for index ${i}`) } dataRow.index.resolve(i) - const row = groupData[i - rowStart] + const j = i - rowStart + const row = groupData[j] if (row === undefined) { throw new Error(`Missing row in groupData for index: ${i - rowStart}`) } @@ -54,11 +81,24 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): } } - function getSortIndex(orderBy: string) { - let sortIndex = sortCache.get(orderBy) + function getColumnRanks(column: string): Promise { + let columnRanks = columnRanksCache.get(column) + if (!columnRanks) { + columnRanks = parquetColumnRanksWorker({ from, metadata, column }) + columnRanksCache.set(column, columnRanks) + } + return columnRanks + } + + function getSortIndex(orderBy: OrderBy): Promise { + const orderByKey = JSON.stringify(orderBy) + let sortIndex = sortCache.get(orderByKey) if (!sortIndex) { - sortIndex = parquetSortIndexWorker({ from, metadata, orderBy }) - sortCache.set(orderBy, sortIndex) + const orderByRanksPromise = Promise.all( + orderBy.map(({ column, direction }) => getColumnRanks(column).then(ranks => ({ direction, ranks }))) + ) + sortIndex = orderByRanksPromise.then(orderByRanks => computeSortIndex(orderByRanks)) + sortCache.set(orderByKey, sortIndex) } return sortIndex } @@ -66,8 +106,8 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): return { header, numRows: Number(metadata.num_rows), - rows({ start, end, orderBy }: { start: number, end: number, orderBy?: string}) { - if (orderBy) { + rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) { + if (orderBy?.length) { const numRows = end - start const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header)) diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index 96a2c68f..f51ff1ab 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -1,7 +1,8 @@ import { ColumnData, parquetQuery } from 'hyparquet' import { compressors } from 'hyparquet-compressors' +import { getParquetColumn } from '../getParquetColumn.js' import { asyncBufferFrom } from '../utils.js' -import type { ChunkMessage, ErrorMessage, IndicesMessage, ParquetReadWorkerOptions, ResultMessage } from './types.js' +import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js' function postChunkMessage ({ chunk, queryId }: ChunkMessage) { self.postMessage({ chunk, queryId }) @@ -12,35 +13,50 @@ function postResultMessage ({ result, queryId }: ResultMessage) { function postErrorMessage ({ error, queryId }: ErrorMessage) { self.postMessage({ error, queryId }) } -function postIndicesMessage ({ indices, queryId }: IndicesMessage) { - self.postMessage({ indices, queryId }) +function postColumnRanksMessage ({ columnRanks, queryId }: ColumnRanksMessage) { + self.postMessage({ columnRanks, queryId }) } -self.onmessage = async ({ data }: { - data: ParquetReadWorkerOptions & { queryId: number; chunks: boolean }; -}) => { - const { metadata, from, rowStart, rowEnd, orderBy, columns, queryId, chunks, sortIndex } = data +self.onmessage = async ({ data }: { data: ClientMessage }) => { + const { metadata, from, kind, queryId } = data const file = await asyncBufferFrom(from) - if (sortIndex === undefined) { - const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined + if (kind === 'columnRanks') { + const { column } = data + // return the column ranks in ascending order + // we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of + // the descending order, because the rank is the first, not the last, of the ties. But it's enough for the + // purpose of sorting. + try { - const result = await parquetQuery({ metadata, file, rowStart, rowEnd, orderBy, columns, compressors, onChunk }) - postResultMessage({ result, queryId }) + const sortColumn = await getParquetColumn({ metadata, file, column, compressors }) + const valuesWithIndex = sortColumn.map((value, index) => ({ value, index })) + const sortedValuesWithIndex = Array.from(valuesWithIndex).sort(({ value: a }, { value: b }) => compare(a, b)) + const numRows = sortedValuesWithIndex.length + const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => { + const { lastValue, lastRank, ranks } = accumulator + const { value, index } = currentValue + if (value === lastValue) { + ranks[index] = lastRank + return { ranks, lastValue, lastRank } + } else { + ranks[index] = rank + return { ranks, lastValue: value, lastRank: rank } + } + }, { + ranks: Array(numRows).fill(-1) as number[], + lastValue: undefined as unknown, + lastRank: 0, + }).ranks + postColumnRanksMessage({ columnRanks: columnRanks, queryId }) } catch (error) { postErrorMessage({ error: error as Error, queryId }) } } else { + const { rowStart, rowEnd, chunks } = data + const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined try { - // Special case for sorted index - if (orderBy === undefined) - throw new Error('sortParquetWorker requires orderBy') - if (rowStart !== undefined || rowEnd !== undefined) - throw new Error('sortIndex requires all rows') - const sortColumn = await parquetQuery({ metadata, file, columns: [orderBy], compressors }) - const indices = Array.from(sortColumn, (_, index) => index).sort((a, b) => - compare(sortColumn[a]?.[orderBy], sortColumn[b]?.[orderBy]) - ) - postIndicesMessage({ indices, queryId }) + const result = await parquetQuery({ metadata, file, rowStart, rowEnd, compressors, onChunk }) + postResultMessage({ result, queryId }) } catch (error) { postErrorMessage({ error: error as Error, queryId }) } diff --git a/src/lib/workers/parquetWorkerClient.ts b/src/lib/workers/parquetWorkerClient.ts index a31a39ec..f0f683d4 100644 --- a/src/lib/workers/parquetWorkerClient.ts +++ b/src/lib/workers/parquetWorkerClient.ts @@ -2,29 +2,29 @@ import ParquetWorker from './parquetWorker?worker&inline' /// ^ the worker is bundled with the main thread code (inline) which is easier for users to import /// (no need to copy the worker file to the right place) import { ColumnData } from 'hyparquet' -import type { ParquetMessage, ParquetReadWorkerOptions, ParquetSortIndexOptions, Row, SortParquetMessage } from './types.js' +import type { Cells, ColumnRanksClientMessage, ColumnRanksWorkerMessage, ColumnRanksWorkerOptions, QueryClientMessage, QueryWorkerMessage, QueryWorkerOptions } from './types.js' let worker: Worker | undefined let nextQueryId = 0 -interface SortQueryAgent { - kind: 'sortIndex'; - resolve: (value: number[]) => void; - reject: (error: Error) => void; -} interface RowsQueryAgent { kind: 'query'; - resolve: (value: Row[]) => void; + resolve: (value: Cells[]) => void; reject: (error: Error) => void; onChunk?: (chunk: ColumnData) => void; } -type QueryAgent = SortQueryAgent | RowsQueryAgent +interface ColumnRanksQueryAgent { + kind: 'columnRanks'; + resolve: (value: number[]) => void; + reject: (error: Error) => void; +} +type QueryAgent = RowsQueryAgent | ColumnRanksQueryAgent const pending = new Map() function getWorker() { if (!worker) { worker = new ParquetWorker() - worker.onmessage = ({ data }: { data: ParquetMessage | SortParquetMessage }) => { + worker.onmessage = ({ data }: { data: QueryWorkerMessage | ColumnRanksWorkerMessage }) => { const pendingQueryAgent = pending.get(data.queryId) if (!pendingQueryAgent) { console.warn( @@ -32,8 +32,9 @@ function getWorker() { ) return } + if (pendingQueryAgent.kind === 'query') { - const { resolve, reject, onChunk } = pendingQueryAgent + const { onChunk, resolve, reject } = pendingQueryAgent if ('error' in data) { reject(data.error) } else if ('result' in data) { @@ -43,15 +44,16 @@ function getWorker() { } else { reject(new Error('Unexpected message from worker')) } + return + } + + const { resolve, reject } = pendingQueryAgent + if ('error' in data) { + reject(data.error) + } else if ('columnRanks' in data) { + resolve(data.columnRanks) } else { - const { resolve, reject } = pendingQueryAgent - if ('error' in data) { - reject(data.error) - } else if ('indices' in data) { - resolve(data.indices) - } else { - reject(new Error('Unexpected message from worker')) - } + reject(new Error('Unexpected message from worker')) } } } @@ -64,7 +66,8 @@ function getWorker() { * Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs * to be serialized to the worker. */ -export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, orderBy, onChunk }: ParquetReadWorkerOptions): Promise { +export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, onChunk }: QueryWorkerOptions): Promise { + // TODO(SL) Support passing columns? return new Promise((resolve, reject) => { const queryId = nextQueryId++ pending.set(queryId, { kind: 'query', resolve, reject, onChunk }) @@ -72,17 +75,17 @@ export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, orderBy, // If caller provided an onChunk callback, worker will send chunks as they are parsed const chunks = onChunk !== undefined - worker.postMessage({ queryId, metadata, from, rowStart, rowEnd, orderBy, chunks }) + const message: QueryClientMessage = { queryId, metadata, from, rowStart, rowEnd, chunks, kind: 'query' } + worker.postMessage(message) }) } -export function parquetSortIndexWorker({ metadata, from, orderBy }: ParquetSortIndexOptions): Promise { +export function parquetColumnRanksWorker({ metadata, from, column }: ColumnRanksWorkerOptions): Promise { return new Promise((resolve, reject) => { const queryId = nextQueryId++ - pending.set(queryId, { kind: 'sortIndex', resolve, reject }) + pending.set(queryId, { kind: 'columnRanks', resolve, reject }) const worker = getWorker() - worker.postMessage({ - queryId, metadata, from, orderBy, sortIndex: true, - }) + const message: ColumnRanksClientMessage = { queryId, metadata, from, column, kind: 'columnRanks' } + worker.postMessage(message) }) } diff --git a/src/lib/workers/types.ts b/src/lib/workers/types.ts index 52adfc25..babc0b25 100644 --- a/src/lib/workers/types.ts +++ b/src/lib/workers/types.ts @@ -1,4 +1,4 @@ -import { ColumnData, FileMetaData, ParquetReadOptions } from 'hyparquet' +import { ColumnData, FileMetaData } from 'hyparquet' // Serializable constructors for AsyncBuffers interface AsyncBufferFromFile { @@ -11,37 +11,48 @@ interface AsyncBufferFromUrl { requestInit?: RequestInit } export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl +// Cells is defined in hightable, but uses any, not unknown +export type Cells = Record ; -// Same as ParquetReadOptions, but AsyncBufferFrom instead of AsyncBuffer -export interface ParquetReadWorkerOptions extends Omit { +export interface CommonWorkerOptions { + metadata: FileMetaData, from: AsyncBufferFrom - orderBy?: string - sortIndex?: boolean } -// Row is defined in hightable, but not exported + we change any to unknown -export type Row = Record ; - interface Message { queryId: number } +export interface ErrorMessage extends Message { + error: Error +} + +/* Query worker */ +export interface QueryWorkerOptions extends CommonWorkerOptions { + rowStart?: number, + rowEnd?: number, + onChunk?: (chunk: ColumnData) => void +} +export interface QueryClientMessage extends QueryWorkerOptions, Message { + kind: 'query', + chunks?: boolean +} export interface ChunkMessage extends Message { chunk: ColumnData } export interface ResultMessage extends Message { - result: Row[] + result: Cells[] } -export interface IndicesMessage extends Message { - indices: number[] +export type QueryWorkerMessage = ChunkMessage | ResultMessage | ErrorMessage + +/* ColumnRanks worker */ +export interface ColumnRanksWorkerOptions extends CommonWorkerOptions { + column: string } -export interface ErrorMessage extends Message { - error: Error +export interface ColumnRanksClientMessage extends ColumnRanksWorkerOptions, Message { + kind: 'columnRanks' } - -export type ParquetMessage = ChunkMessage | ResultMessage | ErrorMessage -export type SortParquetMessage = IndicesMessage | ErrorMessage - -export interface ParquetSortIndexOptions { - metadata: FileMetaData - from: AsyncBufferFrom - orderBy: string +export interface ColumnRanksMessage extends Message { + columnRanks: number[] } +export type ColumnRanksWorkerMessage = ColumnRanksMessage | ErrorMessage + +export type ClientMessage = QueryClientMessage | ColumnRanksClientMessage diff --git a/src/styles/ParquetView.module.css b/src/styles/ParquetView.module.css new file mode 100644 index 00000000..1a0be0e1 --- /dev/null +++ b/src/styles/ParquetView.module.css @@ -0,0 +1,17 @@ +/* table overrides */ +.hightable { + /* table corner */ + thead td:first-child { + background: url("https://hyperparam.app/assets/table/hyperparam.svg") + #f9f4ff no-repeat center 6px; + } + /* cells */ + tbody td { + cursor: pointer; + } + /* row numbers */ + tbody tr:hover [role="rowheader"] { + background-color: #ccd; + border-right: 1px solid #ccc; + } +} diff --git a/src/styles/app.css b/src/styles/app.css index d341ac4d..18348e26 100644 --- a/src/styles/app.css +++ b/src/styles/app.css @@ -56,21 +56,6 @@ li { height: 100vh; } -/* table overrides */ -.table-container .table-corner { - background: url('https://hyperparam.app/assets/table/hyperparam.svg') #f9f4ff no-repeat center 6px; -} -main .table-container .table th { - background-color: #f1f1f3; -} -main .table td { - cursor: pointer; -} -main .table tbody tr:hover th:first-child { - background-color: #ccd; - border-right: 1px solid #ccc; -} - /* brand logo */ .brand { align-items: center;