Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"watch:url": "NODE_ENV=development nodemon bin/cli.js https://hyperparam.blob.core.windows.net/hyperparam/starcoderdata-js-00000-of-00065.parquet"
},
"dependencies": {
"hightable": "0.12.1",
"hightable": "0.13.0",
"hyparquet": "1.9.1",
"hyparquet-compressors": "1.0.0",
"icebird": "0.1.8",
Expand Down
28 changes: 1 addition & 27 deletions src/components/Cell.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet'
import { useEffect, useState } from 'react'
import type { FileSource } from '../lib/sources/types.js'
import { parquetDataFrame } from '../lib/tableProvider.js'
import { stringify } from '../lib/utils.js'
import Breadcrumb, { BreadcrumbConfig } from './Breadcrumb.js'
import Layout from './Layout.js'

Expand Down Expand Up @@ -75,30 +76,3 @@ export default function CellView({ source, row, col, config }: CellProps) {
</Layout>
)
}

/**
* Robust stringification of any value, including json and bigints.
*/
function stringify(value: unknown): string {
if (typeof value === 'string') return value
if (typeof value === 'number') return value.toLocaleString('en-US')
if (Array.isArray(value)) {
return `[\n${value.map((v) => indent(stringify(v), 2)).join(',\n')}\n]`
}
if (value === null || value === undefined) return JSON.stringify(value)
if (value instanceof Date) return value.toISOString()
if (typeof value === 'object') {
return `{${Object.entries(value)
.filter((d) => d[1] !== undefined)
.map(([k, v]) => `${k}: ${stringify(v)}`)
.join(', ')}}`
}
return '{}'
}

function indent(text: string | undefined, spaces: number) {
return text
?.split('\n')
.map((line) => ' '.repeat(spaces) + line)
.join('\n')
}
6 changes: 3 additions & 3 deletions src/components/viewers/CellPanel.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { DataFrame, stringify } from 'hightable'
import { DataFrame } from 'hightable'
import { useEffect, useState } from 'react'
import { stringify } from '../../lib/utils.js'
import ContentHeader from './ContentHeader.js'

interface ViewerProps {
Expand Down Expand Up @@ -36,8 +37,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose
if (asyncCell === undefined) {
throw new Error(`Cell missing at column ${columnName}`)
}
/* TODO(SL): use the same implementation of stringify, here and in Cell.tsx */
const text = await asyncCell.then(cell => stringify(cell as unknown) ?? '{}')
const text = await asyncCell.then(stringify)
setText(text)
} catch (error) {
setError(error as Error)
Expand Down
7 changes: 6 additions & 1 deletion src/components/viewers/ParquetView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import React, { useCallback, useEffect, useState } from 'react'
import { RoutesConfig, appendSearchParams } from '../../lib/routes.js'
import { FileSource } from '../../lib/sources/types.js'
import { parquetDataFrame } from '../../lib/tableProvider.js'
import { stringify } from '../../lib/utils.js'
import classes from '../../styles/ParquetView.module.css'
Comment thread
severo marked this conversation as resolved.
Outdated
import { Spinner } from '../Layout.js'
import CellPanel from './CellPanel.js'
import ContentHeader, { ContentSize } from './ContentHeader.js'
Expand Down Expand Up @@ -100,7 +102,10 @@ export default function ParquetView({ source, setProgress, setError, config }: V
data={content.dataframe}
onDoubleClickCell={onDoubleClickCell}
onMouseDownCell={onMouseDownCell}
onError={setError} />}
onError={setError}
className={classes.hightable}
stringify={stringify}
/>}

{isLoading && <div className='center'><Spinner /></div>}
</ContentHeader>
Expand Down
65 changes: 65 additions & 0 deletions src/lib/getParquetColumn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { ColumnData, ParquetReadOptions, parquetRead } from 'hyparquet'

type GetColumnOptions = Omit<ParquetReadOptions, 'columns' | 'rowStart' | 'rowEnd' | 'onChunk' | 'onComplete'> & {column: string}

export async function getParquetColumn({ metadata, file, column, compressors }: GetColumnOptions): Promise<unknown[]> {
const numRows = Number(metadata?.num_rows)
if (isNaN(numRows)) {
throw new Error('metadata.num_rows is undefined')
}
if (numRows === 0) {
return []
}
const lastError: {error?: Error} = {}
const values: unknown[] = Array(numRows).fill(undefined)
const ranges: [number, number][] = []
function onChunk({ columnName, columnData, rowStart, rowEnd }: ColumnData) {
if (columnName !== column) {
lastError.error = new Error(`unexpected column name ${columnName}`)
}
for (let i = rowStart; i < rowEnd; i++) {
values[i] = columnData[i - rowStart]
}
ranges.push([rowStart, rowEnd])
}

// this awaits all the promises. When it returns, all the data should have already been sent using onChunk
await parquetRead({ metadata, file, columns: [column], compressors, onChunk })

// Do some checks before returning the data

// check for errors
if (lastError.error !== undefined) {
throw lastError.error
}

// check for missing data (should be faster than checking for undefined values in the array)
const sortedRanges = ranges.sort((a, b) => a[0] - b[0])
for (let i = 0; i < sortedRanges.length - 1; i++) {
const range = sortedRanges[i]
const nextRange = sortedRanges[i + 1]
if (!range || !nextRange) {
throw new Error('The ranges should not be undefined')
}
if (range[1] !== nextRange[0]) {
throw new Error(`missing data between rows ${range[1]} and ${nextRange[0]}`)
}
}
const firstRange = sortedRanges[0]
if (!firstRange) {
throw new Error('The first range should not be undefined')
}
if (firstRange[0] !== 0) {
throw new Error(`missing data before row ${firstRange[0]}`)
}
const lastRange = sortedRanges[sortedRanges.length - 1]
if (!lastRange) {
throw new Error('The last range should not be undefined')
}
if (lastRange[1] !== numRows) {
throw new Error(`missing data after row ${lastRange[1]}`)
}

// return the values
return values
}
2 changes: 1 addition & 1 deletion src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ export * from './sources/index.js'
export { parquetDataFrame } from './tableProvider.js'
export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js'
export { parquetQueryWorker } from './workers/parquetWorkerClient.js'
export type { AsyncBufferFrom, Row } from './workers/types.js'
export type { AsyncBufferFrom, Cells } from './workers/types.js'
58 changes: 49 additions & 9 deletions src/lib/tableProvider.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
import { DataFrame, ResolvableRow, resolvableRow } from 'hightable'
import { DataFrame, OrderBy, ResolvableRow, resolvableRow } from 'hightable'
import { FileMetaData, parquetSchema } from 'hyparquet'
import { parquetQueryWorker, parquetSortIndexWorker } from './workers/parquetWorkerClient.js'
import { parquetColumnRanksWorker, parquetQueryWorker } from './workers/parquetWorkerClient.js'
import type { AsyncBufferFrom } from './workers/types.d.ts'

/*
* sortIndex[0] gives the index of the first row in the sorted table
*/
export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'descending', ranks: number[] }[]): number[] {
if (!(0 in orderByRanks)) {
throw new Error('orderByRanks should have at least one element')
}
const numRows = orderByRanks[0].ranks.length
return Array
.from({ length: numRows }, (_, i) => i)
.sort((a, b) => {
for (const { direction, ranks } of orderByRanks) {
const rankA = ranks[a]
const rankB = ranks[b]
if (rankA === undefined || rankB === undefined) {
throw new Error('Invalid ranks')
}
const value = direction === 'ascending' ? 1 : -1
if (rankA < rankB) return -value
if (rankA > rankB) return value
}
return 0
})
}

/**
* Convert a parquet file into a dataframe.
*/
export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame {
const { children } = parquetSchema(metadata)
const header = children.map(child => child.element.name)
const sortCache = new Map<string, Promise<number[]>>()
const columnRanksCache = new Map<string, Promise<number[]>>()
const data = new Array<ResolvableRow | undefined>(Number(metadata.num_rows))
const groups = new Array(metadata.row_groups.length).fill(false)
let groupStart = 0
Expand All @@ -34,7 +60,8 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
throw new Error(`Missing data row for index ${i}`)
}
dataRow.index.resolve(i)
const row = groupData[i - rowStart]
const j = i - rowStart
const row = groupData[j]
if (row === undefined) {
throw new Error(`Missing row in groupData for index: ${i - rowStart}`)
}
Expand All @@ -54,20 +81,33 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData):
}
}

function getSortIndex(orderBy: string) {
let sortIndex = sortCache.get(orderBy)
function getColumnRanks(column: string): Promise<number[]> {
let columnRanks = columnRanksCache.get(column)
if (!columnRanks) {
columnRanks = parquetColumnRanksWorker({ from, metadata, column })
columnRanksCache.set(column, columnRanks)
}
return columnRanks
}

function getSortIndex(orderBy: OrderBy): Promise<number[]> {
const orderByKey = JSON.stringify(orderBy)
let sortIndex = sortCache.get(orderByKey)
if (!sortIndex) {
sortIndex = parquetSortIndexWorker({ from, metadata, orderBy })
sortCache.set(orderBy, sortIndex)
const orderByRanksPromise = Promise.all(
orderBy.map(({ column, direction }) => getColumnRanks(column).then(ranks => ({ direction, ranks })))
)
sortIndex = orderByRanksPromise.then(orderByRanks => computeSortIndex(orderByRanks))
sortCache.set(orderByKey, sortIndex)
}
return sortIndex
}

return {
header,
numRows: Number(metadata.num_rows),
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: string}) {
if (orderBy) {
rows({ start, end, orderBy }: { start: number, end: number, orderBy?: OrderBy}) {
if (orderBy?.length) {
const numRows = end - start
const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header))

Expand Down
27 changes: 27 additions & 0 deletions src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,30 @@ export const contentTypes: Record<string, string> = {
}

export const imageTypes = ['.png', '.jpg', '.jpeg', '.gif', '.svg', '.tiff', '.webp']

/**
* Robust stringification of any value, including json and bigints.
*/
export function stringify(value: unknown): string {
Comment thread
severo marked this conversation as resolved.
Outdated
if (typeof value === 'string') return value
if (typeof value === 'number' || typeof value === 'bigint') return value.toLocaleString('en-US')
if (Array.isArray(value)) {
return `[\n${value.map((v) => indent(stringify(v), 2)).join(',\n')}\n]`
}
if (value === null || value === undefined) return JSON.stringify(value)
if (value instanceof Date) return value.toISOString()
if (typeof value === 'object') {
return `{${Object.entries(value)
.filter((d) => d[1] !== undefined)
.map(([k, v]) => `${k}: ${stringify(v)}`)
.join(', ')}}`
}
return '{}'
}

function indent(text: string | undefined, spaces: number) {
return text
?.split('\n')
.map((line) => ' '.repeat(spaces) + line)
.join('\n')
}
58 changes: 37 additions & 21 deletions src/lib/workers/parquetWorker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { ColumnData, parquetQuery } from 'hyparquet'
import { compressors } from 'hyparquet-compressors'
import { getParquetColumn } from '../getParquetColumn.js'
import { asyncBufferFrom } from '../utils.js'
import type { ChunkMessage, ErrorMessage, IndicesMessage, ParquetReadWorkerOptions, ResultMessage } from './types.js'
import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js'

function postChunkMessage ({ chunk, queryId }: ChunkMessage) {
self.postMessage({ chunk, queryId })
Expand All @@ -12,35 +13,50 @@ function postResultMessage ({ result, queryId }: ResultMessage) {
function postErrorMessage ({ error, queryId }: ErrorMessage) {
self.postMessage({ error, queryId })
}
function postIndicesMessage ({ indices, queryId }: IndicesMessage) {
self.postMessage({ indices, queryId })
function postColumnRanksMessage ({ columnRanks, queryId }: ColumnRanksMessage) {
self.postMessage({ columnRanks, queryId })
}

self.onmessage = async ({ data }: {
data: ParquetReadWorkerOptions & { queryId: number; chunks: boolean };
}) => {
const { metadata, from, rowStart, rowEnd, orderBy, columns, queryId, chunks, sortIndex } = data
self.onmessage = async ({ data }: { data: ClientMessage }) => {
const { metadata, from, kind, queryId } = data
const file = await asyncBufferFrom(from)
if (sortIndex === undefined) {
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
if (kind === 'columnRanks') {
const { column } = data
// return the column ranks in ascending order
// we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of
// the descending order, because the rank is the first, not the last, of the ties. But it's enough for the
// purpose of sorting.

try {
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, orderBy, columns, compressors, onChunk })
postResultMessage({ result, queryId })
const sortColumn = await getParquetColumn({ metadata, file, column, compressors })
const valuesWithIndex = sortColumn.map((value, index) => ({ value, index }))
const sortedValuesWithIndex = Array.from(valuesWithIndex).sort(({ value: a }, { value: b }) => compare<unknown>(a, b))
const numRows = sortedValuesWithIndex.length
const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => {
const { lastValue, lastRank, ranks } = accumulator
const { value, index } = currentValue
if (value === lastValue) {
ranks[index] = lastRank
return { ranks, lastValue, lastRank }
} else {
ranks[index] = rank
return { ranks, lastValue: value, lastRank: rank }
}
}, {
ranks: Array(numRows).fill(-1) as number[],
lastValue: undefined as unknown,
lastRank: 0,
}).ranks
postColumnRanksMessage({ columnRanks: columnRanks, queryId })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
} else {
const { rowStart, rowEnd, chunks } = data
const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined
try {
// Special case for sorted index
if (orderBy === undefined)
throw new Error('sortParquetWorker requires orderBy')
if (rowStart !== undefined || rowEnd !== undefined)
throw new Error('sortIndex requires all rows')
const sortColumn = await parquetQuery({ metadata, file, columns: [orderBy], compressors })
const indices = Array.from(sortColumn, (_, index) => index).sort((a, b) =>
compare<unknown>(sortColumn[a]?.[orderBy], sortColumn[b]?.[orderBy])
)
postIndicesMessage({ indices, queryId })
const result = await parquetQuery({ metadata, file, rowStart, rowEnd, compressors, onChunk })
postResultMessage({ result, queryId })
} catch (error) {
postErrorMessage({ error: error as Error, queryId })
}
Expand Down
Loading