Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"watch:url": "NODE_ENV=development nodemon bin/cli.js https://hyperparam.blob.core.windows.net/hyperparam/starcoderdata-js-00000-of-00065.parquet"
},
"dependencies": {
"hightable": "0.12.1",
"hightable": "0.13.1",
"hyparquet": "1.9.1",
"hyparquet-compressors": "1.0.0",
"icebird": "0.1.8",
Expand Down
28 changes: 1 addition & 27 deletions src/components/Cell.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { stringify } from 'hightable'
import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
import { useEffect, useState } from 'react'
import type { FileSource } from '../lib/sources/types.js'
Expand Down Expand Up @@ -75,30 +76,3 @@ export default function CellView({ source, row, col, config }: CellProps) {
</Layout>
)
}

/**
* Robust stringification of any value, including json and bigints.
*/
function stringify(value: unknown): string {
if (typeof value === 'string') return value
if (typeof value === 'number') return value.toLocaleString('en-US')
if (Array.isArray(value)) {
return `[\n${value.map((v) => indent(stringify(v), 2)).join(',\n')}\n]`
}
if (value === null || value === undefined) return JSON.stringify(value)
if (value instanceof Date) return value.toISOString()
if (typeof value === 'object') {
return `{${Object.entries(value)
.filter((d) => d[1] !== undefined)
.map(([k, v]) => `${k}: ${stringify(v)}`)
.join(', ')}}`
}
return '{}'
}

function indent(text: string | undefined, spaces: number) {
return text
?.split('\n')
.map((line) => ' '.repeat(spaces) + line)
.join('\n')
}
3 changes: 1 addition & 2 deletions src/components/viewers/CellPanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose
if (asyncCell === undefined) {
throw new Error(`Cell missing at column ${columnName}`)
}
/* TODO(SL): use the same implementation of stringify, here and in Cell.tsx */
const text = await asyncCell.then(cell => stringify(cell as unknown) ?? '{}')
const text = await asyncCell.then(stringify)
setText(text)
} catch (error) {
setError(error as Error)
Expand Down
8 changes: 6 additions & 2 deletions src/components/viewers/ParquetView.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import HighTable, { DataFrame, rowCache } from 'hightable'
import HighTable, { DataFrame, rowCache, stringify } from 'hightable'
import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
import React, { useCallback, useEffect, useState } from 'react'
import { RoutesConfig, appendSearchParams } from '../../lib/routes.js'
import { FileSource } from '../../lib/sources/types.js'
import { parquetDataFrame } from '../../lib/tableProvider.js'
import styles from '../../styles/ParquetView.module.css'
import { Spinner } from '../Layout.js'
import CellPanel from './CellPanel.js'
import ContentHeader, { ContentSize } from './ContentHeader.js'
Expand Down Expand Up @@ -100,7 +101,10 @@ export default function ParquetView({ source, setProgress, setError, config }: V
data={content.dataframe}
onDoubleClickCell={onDoubleClickCell}
onMouseDownCell={onMouseDownCell}
onError={setError} />}
onError={setError}
className={styles.hightable}
stringify={stringify}
/>}

{isLoading && <div className='center'><Spinner /></div>}
</ContentHeader>
Expand Down
65 changes: 65 additions & 0 deletions src/lib/getParquetColumn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { ColumnData, ParquetReadOptions, parquetRead } from 'hyparquet'

type GetColumnOptions = Omit<ParquetReadOptions, 'columns' | 'rowStart' | 'rowEnd' | 'onChunk' | 'onComplete'> & {column: string}

export async function getParquetColumn({ metadata, file, column, compressors }: GetColumnOptions): Promise<unknown[]> {
const numRows = Number(metadata?.num_rows)
if (isNaN(numRows)) {
throw new Error('metadata.num_rows is undefined')
}
if (numRows === 0) {
return []
}
const lastError: {error?: Error} = {}
const values: unknown[] = Array(numRows).fill(undefined)
const ranges: [number, number][] = []
function onChunk({ columnName, columnData, rowStart, rowEnd }: ColumnData) {
if (columnName !== column) {
lastError.error = new Error(`unexpected column name ${columnName}`)
}
for (let i = rowStart; i < rowEnd; i++) {
values[i] = columnData[i - rowStart]
}
ranges.push([rowStart, rowEnd])
}

// this awaits all the promises. When it returns, all the data should have already been sent using onChunk
await parquetRead({ metadata, file, columns: [column], compressors, onChunk })

// Do some checks before returning the data

// check for errors
if (lastError.error !== undefined) {
throw lastError.error
}

// check for missing data (should be faster than checking for undefined values in the array)
const sortedRanges = ranges.sort((a, b) => a[0] - b[0])
for (let i = 0; i < sortedRanges.length - 1; i++) {
const range = sortedRanges[i]
const nextRange = sortedRanges[i + 1]
if (!range || !nextRange) {
throw new Error('The ranges should not be undefined')
}
if (range[1] !== nextRange[0]) {
throw new Error(`missing data between rows ${range[1]} and ${nextRange[0]}`)
}
}
const firstRange = sortedRanges[0]
if (!firstRange) {
throw new Error('The first range should not be undefined')
}
if (firstRange[0] !== 0) {
throw new Error(`missing data before row ${firstRange[0]}`)
}
const lastRange = sortedRanges[sortedRanges.length - 1]
if (!lastRange) {
throw new Error('The last range should not be undefined')
}
if (lastRange[1] !== numRows) {
throw new Error(`missing data after row ${lastRange[1]}`)
}

// return the values
return values
}
2 changes: 1 addition & 1 deletion src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ export * from './sources/index.js'
export { parquetDataFrame } from './tableProvider.js'
export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js'
export { parquetQueryWorker } from './workers/parquetWorkerClient.js'
export type { AsyncBufferFrom, Row } from './workers/types.js'
export type { AsyncBufferFrom, Cells } from './workers/types.js'
58 changes: 49 additions & 9 deletions src/lib/tableProvider.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
import { DataFrame, ResolvableRow, resolvableRow } from 'hightable'
import { DataFrame, OrderBy, ResolvableRow, resolvableRow } from 'hightable'
import { FileMetaData, parquetSchema } from 'hyparquet'
import { parquetQueryWorker, parquetSortIndexWorker } from './workers/parquetWorkerClient.js'
import { parquetColumnRanksWorker, parquetQueryWorker } from './workers/parquetWorkerClient.js'
import type { AsyncBufferFrom } from './workers/types.d.ts'

/*
* sortIndex[0] gives the index of the first row in the sorted table
*/
export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'descending', ranks: number[] }[]): number[] {
if (!(0 in orderByRanks)) {
throw new Error('orderByRanks should have at least one element')
}
const numRows = orderByRanks[0].ranks.length
return Array
.from({ length: numRows }, (_, i) => i)
.sort((a, b) => {
for (const { direction, ranks } of orderByRanks) {
const rankA = ranks[a]
const rankB = ranks[b]
if (rankA === undefined || rankB === undefined) {
throw new Error('Invalid ranks')
}
const value = direction === 'ascending' ? 1 : -1
if (rankA < rankB) return -value
if (rankA > rankB) return value
}
return 0
})
}

/**
* Convert a parquet file into a dataframe.
*/
export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame {
const { children } = parquetSchema(metadata)
const header = children.map(child => child.element.name)
const sortCache = new Map<string, Promise<number[]>>()
const columnRanksCache = new Map<string, Promise<number[]>>()
const data = new Array<ResolvableRow | undefined>(Number(metadata.num_rows))
const groups = new Array(metadata.row_groups.length).fill(false)
let groupStart = 0
Expand All @@ -34,7 +60,8 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
throw new Error(`Missing data row for index ${i}`)
}
dataRow.index.resolve(i)
const row = groupData[i - rowStart]
const j = i - rowStart
const row = groupData[j]
if (row === undefined) {
throw new Error(`Missing row in groupData for index: ${i - rowStart}`)
}
Expand All @@ -54,20 +81,33 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
}
}

function getSortIndex(orderBy: string) {
let sortIndex = sortCache.get(orderBy)
function getColumnRanks(column: string): Promise<number[]> {
let columnRanks = columnRanksCache.get(column)
if (!columnRanks) {
columnRanks = parquetColumnRanksWorker({ from, metadata, column })
columnRanksCache.set(column, columnRanks)
}
return columnRanks
}

function getSortIndex(orderBy: OrderBy): Promise<number[]> {
const orderByKey = JSON.stringify(orderBy)
let sortIndex = sortCache.get(orderByKey)
if (!sortIndex) {
sortIndex = parquetSortIndexWorker({ from, metadata, orderBy })
sortCache.set(orderBy, sortIndex)
const orderByRanksPromise = Promise.all(
orderBy.map(({ column, direction }) => getColumnRanks(column).then(ranks => ({ direction, ranks })))
)
sortIndex = orderByRanksPromise.then(orderByRanks => computeSortIndex(orderByRanks))
sortCache.set(orderByKey, sortIndex)
}
return sortIndex
}

return {
header,
numRows: Number(metadata.num_rows),
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: string}) {
if (orderBy) {
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) {
if (orderBy?.length) {
const numRows = end - start
const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header))

Expand Down
58 changes: 37 additions & 21 deletions src/lib/workers/parquetWorker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { ColumnData, parquetQuery } from 'hyparquet'
import { compressors } from 'hyparquet-compressors'
import { getParquetColumn } from '../getParquetColumn.js'
import { asyncBufferFrom } from '../utils.js'
import type { ChunkMessage, ErrorMessage, IndicesMessage, ParquetReadWorkerOptions, ResultMessage } from './types.js'
import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js'

function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
self.postMessage({ chunk, queryId })
Expand All @@ -12,35 +13,50 @@ function postResultMessage ({ result, queryId }: ResultMessage) {
function postErrorMessage ({ error, queryId }: ErrorMessage) {
self.postMessage({ error, queryId })
}
function postIndicesMessage ({ indices, queryId }: IndicesMessage) {
self.postMessage({ indices, queryId })
function postColumnRanksMessage ({ columnRanks, queryId }: ColumnRanksMessage) {
self.postMessage({ columnRanks, queryId })
}

self.onmessage = async ({ data }: {
data: ParquetReadWorkerOptions & { queryId: number; chunks: boolean };
}) => {
const { metadata, from, rowStart, rowEnd, orderBy, columns, queryId, chunks, sortIndex } = data
self.onmessage = async ({ data }: { data: ClientMessage }) => {
const { metadata, from, kind, queryId } = data
const file = await asyncBufferFrom(from)
if (sortIndex === undefined) {
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
if (kind === 'columnRanks') {
const { column } = data
// return the column ranks in ascending order
// we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of
// the descending order, because the rank is the first, not the last, of the ties. But it's enough for the
// purpose of sorting.

try {
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, orderBy, columns, compressors, onChunk })
postResultMessage({ result, queryId })
const sortColumn = await getParquetColumn({ metadata, file, column, compressors })
const valuesWithIndex = sortColumn.map((value, index) => ({ value, index }))
const sortedValuesWithIndex = Array.from(valuesWithIndex).sort(({ value: a }, { value: b }) => compare<unknown>(a, b))
const numRows = sortedValuesWithIndex.length
const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => {
const { lastValue, lastRank, ranks } = accumulator
const { value, index } = currentValue
if (value === lastValue) {
ranks[index] = lastRank
return { ranks, lastValue, lastRank }
} else {
ranks[index] = rank
return { ranks, lastValue: value, lastRank: rank }
}
}, {
ranks: Array(numRows).fill(-1) as number[],
lastValue: undefined as unknown,
lastRank: 0,
}).ranks
postColumnRanksMessage({ columnRanks: columnRanks, queryId })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
} else {
const { rowStart, rowEnd, chunks } = data
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
try {
// Special case for sorted index
if (orderBy === undefined)
throw new Error('sortParquetWorker requires orderBy')
if (rowStart !== undefined || rowEnd !== undefined)
throw new Error('sortIndex requires all rows')
const sortColumn = await parquetQuery({ metadata, file, columns: [orderBy], compressors })
const indices = Array.from(sortColumn, (_, index) => index).sort((a, b) =>
compare<unknown>(sortColumn[a]?.[orderBy], sortColumn[b]?.[orderBy])
)
postIndicesMessage({ indices, queryId })
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, compressors, onChunk })
postResultMessage({ result, queryId })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
Expand Down
Loading