Skip to content

Commit

Permalink
Add Parquet format support for streaming (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Nov 16, 2023
1 parent c836bb1 commit ddfd077
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .docker/clickhouse/single_node_tls/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM clickhouse/clickhouse-server:23.8-alpine
FROM clickhouse/clickhouse-server:23.10-alpine
COPY .docker/clickhouse/single_node_tls/certificates /etc/clickhouse-server/certs
RUN chown clickhouse:clickhouse -R /etc/clickhouse-server/certs \
&& chmod 600 /etc/clickhouse-server/certs/* \
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 0.2.6 (Common, Node.js)

### New features

- Added [Parquet format](https://clickhouse.com/docs/en/integrations/data-formats/parquet) streaming support.
See the new examples:
[insert from a file](./examples/node/insert_file_stream_parquet.ts),
[select into a file](./examples/node/select_parquet_as_file.ts).

## 0.2.5 (Common, Node.js, Web)

### Bug fixes
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2.3'

services:
clickhouse1:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.8-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.10-alpine}'
ulimits:
nofile:
soft: 262144
Expand All @@ -19,7 +19,7 @@ services:
- './.docker/clickhouse/users.xml:/etc/clickhouse-server/users.xml'

clickhouse2:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.8-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.10-alpine}'
ulimits:
nofile:
soft: 262144
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.8'
services:
clickhouse:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.8-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.10-alpine}'
container_name: 'clickhouse-js-clickhouse-server'
ports:
- '8123:8123'
Expand Down
3 changes: 2 additions & 1 deletion examples/node/insert_file_stream_csv.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createClient } from '@clickhouse/client'
import type { Row } from '@clickhouse/client-common'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
Expand All @@ -19,7 +20,7 @@ void (async () => {
})

// contains data as 1,"foo","[1,2]"\n2,"bar","[3,4]"\n...
const filename = Path.resolve(process.cwd(), './node/resources/data.csv')
const filename = Path.resolve(cwd(), './node/resources/data.csv')

await client.insert({
table: tableName,
Expand Down
3 changes: 2 additions & 1 deletion examples/node/insert_file_stream_ndjson.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Row } from '@clickhouse/client'
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'
import split from 'split2'

Expand All @@ -20,7 +21,7 @@ void (async () => {

// contains id as numbers in JSONCompactEachRow format ["0"]\n["0"]\n...
// see also: NDJSON format
const filename = Path.resolve(process.cwd(), './node/resources/data.ndjson')
const filename = Path.resolve(cwd(), './node/resources/data.ndjson')

await client.insert({
table: tableName,
Expand Down
58 changes: 58 additions & 0 deletions examples/node/insert_file_stream_parquet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { createClient } from '@clickhouse/client'
import type { Row } from '@clickhouse/client-common'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
const client = createClient()
const tableName = 'insert_file_stream_parquet'
await client.command({
query: `DROP TABLE IF EXISTS ${tableName}`,
})
await client.command({
query: `
CREATE TABLE ${tableName}
(id UInt64, name String, sku Array(UInt8))
ENGINE MergeTree()
ORDER BY (id)
`,
})

const filename = Path.resolve(cwd(), './node/resources/data.parquet')

/*
(examples) $ pqrs cat node/resources/data.parquet
############################
File: node/resources/data.parquet
############################
{id: 0, name: [97], sku: [1, 2]}
{id: 1, name: [98], sku: [3, 4]}
{id: 2, name: [99], sku: [5, 6]}
*/

await client.insert({
table: tableName,
values: Fs.createReadStream(filename),
format: 'Parquet',
})

const rs = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONEachRow',
})

for await (const rows of rs.stream()) {
// or just `rows.json()`
// to consume the entire response at once
rows.forEach((row: Row) => {
console.log(row.json())
})
}

await client.close()
})()
Binary file added examples/node/resources/data.parquet
Binary file not shown.
42 changes: 42 additions & 0 deletions examples/node/select_parquet_as_file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
const client = createClient()

const { stream } = await client.exec({
query: `SELECT * from system.numbers LIMIT 10 FORMAT Parquet`,
})

const filename = Path.resolve(cwd(), './node/out.parquet')
const writeStream = Fs.createWriteStream(filename)
stream.pipe(writeStream)
await new Promise((resolve) => {
stream.on('end', resolve)
})

/*
(examples) $ pqrs cat node/out.parquet
#################
File: node/out.parquet
#################
{number: 0}
{number: 1}
{number: 2}
{number: 3}
{number: 4}
{number: 5}
{number: 6}
{number: 7}
{number: 8}
{number: 9}
*/

await client.close()
})()
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"@types/uuid": "^9.0.2",
"@typescript-eslint/eslint-plugin": "^5.49.0",
"@typescript-eslint/parser": "^5.49.0",
"apache-arrow": "^14.0.1",
"eslint": "^8.32.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-prettier": "^4.2.1",
Expand All @@ -63,6 +64,7 @@
"karma-typescript": "^5.5.4",
"karma-webpack": "^5.0.0",
"lint-staged": "^13.1.0",
"parquet-wasm": "^0.6.0-beta.1",
"prettier": "2.8.3",
"sinon": "^15.2.0",
"split2": "^4.1.0",
Expand Down
Binary file not shown.
1 change: 1 addition & 0 deletions packages/client-common/src/data_formatter/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const supportedRawFormats = [
'CustomSeparated',
'CustomSeparatedWithNames',
'CustomSeparatedWithNamesAndTypes',
'Parquet',
] as const

export type JSONDataFormat = (typeof supportedJSONFormats)[number]
Expand Down
2 changes: 1 addition & 1 deletion packages/client-common/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import { fakerRU } from '@faker-js/faker'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { createTableWithFields } from '@test/fixtures/table_with_fields'
import { createTestClient, guid } from '@test/utils'
import { tableFromIPC } from 'apache-arrow'
import { Buffer } from 'buffer'
import Fs from 'fs'
import { readParquet } from 'parquet-wasm'
import split from 'split2'
import Stream from 'stream'

Expand All @@ -28,7 +31,7 @@ describe('[Node.js] streaming e2e', () => {
['2', 'c', [5, 6]],
]

it('should stream a file', async () => {
it('should stream an NDJSON file', async () => {
// contains id as numbers in JSONCompactEachRow format ["0"]\n["1"]\n...
const filename =
'packages/client-common/__tests__/fixtures/streaming_e2e_data.ndjson'
Expand All @@ -55,6 +58,67 @@ describe('[Node.js] streaming e2e', () => {
expect(actual).toEqual(expected)
})

it('should stream a Parquet file', async () => {
const filename =
'packages/client-common/__tests__/fixtures/streaming_e2e_data.parquet'
await client.insert({
table: tableName,
values: Fs.createReadStream(filename),
format: 'Parquet',
})

// check that the data was inserted correctly
const rs = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONCompactEachRow',
})

const actual: unknown[] = []
for await (const rows of rs.stream()) {
rows.forEach((row: Row) => {
actual.push(row.json())
})
}
expect(actual).toEqual(expected)

// check if we can stream it back and get the output matching the input file
const stream = await client
.exec({
query: `SELECT * from ${tableName} FORMAT Parquet`,
clickhouse_settings: {
output_format_parquet_compression_method: 'none',
output_format_parquet_version: '2.6',
},
})
.then((r) => r.stream)

const parquetChunks: Buffer[] = []
for await (const chunk of stream) {
parquetChunks.push(chunk)
}

const table = tableFromIPC(
readParquet(Buffer.concat(parquetChunks)).intoIPCStream()
)
expect(table.schema.toString()).toEqual(
'Schema<{ 0: id: Uint64, 1: name: Binary, 2: sku: List<Uint8> }>'
)
const actualParquetData: unknown[] = []
const textDecoder = new TextDecoder()
table.toArray().map((v) => {
const row: Record<string, unknown> = {}
row['id'] = v.id
row['name'] = textDecoder.decode(v.name) // [char] -> String
row['sku'] = Array.from(v.sku.toArray()) // Vector -> UInt8Array -> Array
actualParquetData.push(row)
})
expect(actualParquetData).toEqual([
{ id: 0n, name: 'a', sku: [1, 2] },
{ id: 1n, name: 'b', sku: [3, 4] },
{ id: 2n, name: 'c', sku: [5, 6] },
])
})

it('should stream a stream created in-place', async () => {
await client.insert({
table: tableName,
Expand Down
2 changes: 1 addition & 1 deletion packages/client-node/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
2 changes: 1 addition & 1 deletion packages/client-web/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2019",
"target": "es2020",
"module": "commonjs",
"moduleResolution": "node",
"declaration": true,
Expand Down

0 comments on commit ddfd077

Please sign in to comment.