Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSONEachRowWithProgress #334

Merged
merged 10 commits into from
Oct 5, 2024
39 changes: 39 additions & 0 deletions examples/node/select_json_each_row_with_progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { createClient } from '@clickhouse/client'
import { isProgress } from '@clickhouse/client-common'
slvrtrn marked this conversation as resolved.
Show resolved Hide resolved

/** See the format spec - https://clickhouse.com/docs/en/interfaces/formats#jsoneachrowwithprogress
* When JSONEachRowWithProgress format is used in TypeScript,
* the ResultSet should infer the final row type as `{ row: Data } | Progress`. */
type Data = { number: string }

void (async () => {
const client = createClient()
const rs = await client.query({
query: 'SELECT number FROM system.numbers LIMIT 100',
format: 'JSONEachRowWithProgress',
})

let totalRows = 0
let totalProgressRows = 0

const stream = rs.stream<Data>()
for await (const rows of stream) {
for (const row of rows) {
const decodedRow = row.json()
if (isProgress(decodedRow)) {
console.log('Got a progress row:', decodedRow)
totalProgressRows++
} else {
totalRows++
if (totalRows % 100 === 0) {
console.log('Sample row:', decodedRow)
}
}
}
}

console.log('Total rows:', totalRows)
console.log('Total progress rows:', totalProgressRows)

await client.close()
})()
27 changes: 27 additions & 0 deletions packages/client-common/__tests__/unit/clickhouse_types.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { isProgress } from '@clickhouse/client-common'

describe('ClickHouse types', () => {
it('should check if a row is progress row', async () => {
const row = {
progress: {
read_rows: '1',
read_bytes: '1',
written_rows: '1',
written_bytes: '1',
total_rows_to_read: '1',
result_rows: '1',
result_bytes: '1',
elapsed_ns: '1',
},
}
expect(isProgress(row)).toBeTruthy()
expect(isProgress({})).toBeFalsy()
expect(
isProgress({
...row,
extra: 'extra',
}),
).toBeFalsy()
expect(isProgress(null)).toBeFalsy()
})
})
16 changes: 16 additions & 0 deletions packages/client-common/src/clickhouse_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,19 @@ export interface WithClickHouseSummary {
export interface WithResponseHeaders {
response_headers: ResponseHeaders
}

/** X-ClickHouse-Summary response header and progress rows from JSONEachRowWithProgress share the same structure */
export interface Progress {
progress: ClickHouseSummary
}

/** Type guard to use with JSONEachRowWithProgress, checking if the emitted row is a progress row.
* @see https://clickhouse.com/docs/en/interfaces/formats#jsoneachrowwithprogress */
export function isProgress(row: unknown): row is Progress {
return (
row !== null &&
typeof row === 'object' &&
'progress' in row &&
Object.keys(row).length === 1
)
}
1 change: 1 addition & 0 deletions packages/client-common/src/data_formatter/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const StreamableJSONFormats = [
'JSONCompactEachRowWithNamesAndTypes',
'JSONCompactStringsEachRowWithNames',
'JSONCompactStringsEachRowWithNamesAndTypes',
'JSONEachRowWithProgress',
] as const
export const RecordsJSONFormats = ['JSONObjectEachRow'] as const
export const SingleDocumentJSONFormats = [
Expand Down
3 changes: 3 additions & 0 deletions packages/client-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export {
export { type BaseClickHouseClientConfigOptions } from './config'
export type {
Row,
RowOrProgress,
BaseResultSet,
ResultJSONType,
RowJSONType,
Expand Down Expand Up @@ -51,7 +52,9 @@ export type {
ResponseHeaders,
WithClickHouseSummary,
WithResponseHeaders,
Progress,
} from './clickhouse_types'
export { isProgress } from './clickhouse_types'
export {
type ClickHouseSettings,
type MergeTreeSettings,
Expand Down
56 changes: 34 additions & 22 deletions packages/client-common/src/result.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import type { ResponseHeaders, ResponseJSON } from './clickhouse_types'
import type {
Progress,
ResponseHeaders,
ResponseJSON,
} from './clickhouse_types'
import type {
DataFormat,
RawDataFormat,
Expand All @@ -8,6 +12,8 @@ import type {
StreamableJSONDataFormat,
} from './data_formatter'

export type RowOrProgress<T> = { row: T } | Progress

export type ResultStream<Format extends DataFormat | unknown, Stream> =
// JSON*EachRow (except JSONObjectEachRow), CSV, TSV etc.
Format extends StreamableDataFormat
Expand All @@ -22,29 +28,35 @@ export type ResultStream<Format extends DataFormat | unknown, Stream> =
Stream

export type ResultJSONType<T, F extends DataFormat | unknown> =
// JSON*EachRow formats except JSONObjectEachRow
F extends StreamableJSONDataFormat
? T[]
: // JSON formats with known layout { data, meta, statistics, ... }
F extends SingleDocumentJSONFormat
? ResponseJSON<T>
: // JSON formats represented as a Record<string, T>
F extends RecordsJSONFormat
? Record<string, T>
: // CSV, TSV etc. - cannot be represented as JSON
F extends RawDataFormat
? never
: // happens only when Format could not be inferred from a literal
T[] | Record<string, T> | ResponseJSON<T>
// Emits either a { row: T } or an object with progress
F extends 'JSONEachRowWithProgress'
? RowOrProgress<T>[]
: // JSON*EachRow formats except JSONObjectEachRow
F extends StreamableJSONDataFormat
? T[]
: // JSON formats with known layout { data, meta, statistics, ... }
F extends SingleDocumentJSONFormat
? ResponseJSON<T>
: // JSON formats represented as a Record<string, T>
F extends RecordsJSONFormat
? Record<string, T>
: // CSV, TSV etc. - cannot be represented as JSON
F extends RawDataFormat
? never
: // happens only when Format could not be inferred from a literal
T[] | Record<string, T> | ResponseJSON<T>

export type RowJSONType<T, F extends DataFormat | unknown> =
// JSON*EachRow formats
F extends StreamableJSONDataFormat
? T
: // CSV, TSV, non-streamable JSON formats - cannot be streamed as JSON
F extends RawDataFormat | SingleDocumentJSONFormat | RecordsJSONFormat
? never
: T // happens only when Format could not be inferred from a literal
// Emits either a { row: T } or an object with progress
F extends 'JSONEachRowWithProgress'
? RowOrProgress<T>
: // JSON*EachRow formats
F extends StreamableJSONDataFormat
? T
: // CSV, TSV, non-streamable JSON formats - cannot be streamed as JSON
F extends RawDataFormat | SingleDocumentJSONFormat | RecordsJSONFormat
? never
: T // happens only when Format could not be inferred from a literal

export interface Row<
JSONType = unknown,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { ResultSet } from '../../src'
import type {
ClickHouseClient as BaseClickHouseClient,
DataFormat,
} from '@clickhouse/client-common'
import { createTableWithFields } from '@test/fixtures/table_with_fields'
import { guid } from '@test/utils'
import type { ClickHouseClient } from '../../src'
import type { ClickHouseClient, ResultSet } from '../../src'
import { createNodeTestClient } from '../utils/node_client'

/* eslint-disable @typescript-eslint/no-unused-expressions */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type ClickHouseClient } from '@clickhouse/client-common'
import { type ClickHouseClient, isProgress } from '@clickhouse/client-common'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { assertJsonValues, jsonValues } from '@test/fixtures/test_data'
import { createTestClient, guid } from '@test/utils'
Expand Down Expand Up @@ -231,6 +231,26 @@ describe('[Node.js] stream JSON formats', () => {
})
})

describe('JSONEachRowWithProgress', () => {
it('should work', async () => {
const limit = 2
const expectedProgressRowsCount = 4
const rs = await client.query({
query: `SELECT number FROM system.numbers LIMIT ${limit}`,
format: 'JSONEachRowWithProgress',
clickhouse_settings: {
max_block_size: '1', // reduce the block size, so the progress is reported more frequently
},
})
const rows = await rs.json<{ number: 'string' }>()
expect(rows.length).toEqual(limit + expectedProgressRowsCount)
expect(rows.filter((r) => !isProgress(r)) as unknown[]).toEqual([
{ row: { number: '0' } },
{ row: { number: '1' } },
])
})
})

it('does not throw if stream closes prematurely', async () => {
const stream = new Stream.Readable({
objectMode: true,
Expand Down
3 changes: 3 additions & 0 deletions packages/client-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ export {
type ParsedColumnType,
parseColumnType,
SimpleColumnTypes,
type Progress,
isProgress,
type RowOrProgress,
} from '@clickhouse/client-common'
2 changes: 1 addition & 1 deletion packages/client-node/src/result_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class ResultSet<Format extends DataFormat | unknown>
const stream = this.stream<T>()
for await (const rows of stream) {
for (const row of rows) {
result.push(row.json())
result.push(row.json() as T)
}
}
return result as any
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ClickHouseClient, Row } from '@clickhouse/client-common'
import { isProgress } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'

describe('[Web] SELECT streaming', () => {
Expand Down Expand Up @@ -117,6 +118,24 @@ describe('[Web] SELECT streaming', () => {
])
})

it('should return objects in JSONEachRowWithProgress format', async () => {
const limit = 2
const expectedProgressRowsCount = 4
const rs = await client.query({
query: `SELECT * FROM system.numbers LIMIT ${limit}`,
format: 'JSONEachRowWithProgress',
clickhouse_settings: {
max_block_size: '1', // reduce the block size, so the progress is reported more frequently
},
})
const rows = await rs.json<{ number: string }>()
expect(rows.length).toEqual(limit + expectedProgressRowsCount)
expect(rows.filter((r) => !isProgress(r)) as unknown[]).toEqual([
{ row: { number: '0' } },
{ row: { number: '1' } },
])
})

it('returns stream of objects in JSONStringsEachRow format', async () => {
const result = await client.query({
query: 'SELECT number FROM system.numbers LIMIT 5',
Expand Down
3 changes: 3 additions & 0 deletions packages/client-web/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ export {
type ParsedColumnType,
parseColumnType,
SimpleColumnTypes,
type Progress,
isProgress,
type RowOrProgress,
} from '@clickhouse/client-common'
2 changes: 1 addition & 1 deletion packages/client-web/src/result_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class ResultSet<Format extends DataFormat | unknown>
break
}
for (const row of value) {
result.push(row.json())
result.push(row.json() as T)
}
}
return result as any
Expand Down