From ddfd07705a2979565b6a602787bb9f56b91b9f24 Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Thu, 16 Nov 2023 23:58:37 +0100 Subject: [PATCH] Add Parquet format support for streaming (#208) --- .docker/clickhouse/single_node_tls/Dockerfile | 2 +- CHANGELOG.md | 9 +++ docker-compose.cluster.yml | 4 +- docker-compose.yml | 2 +- examples/node/insert_file_stream_csv.ts | 3 +- examples/node/insert_file_stream_ndjson.ts | 3 +- examples/node/insert_file_stream_parquet.ts | 58 +++++++++++++++ examples/node/resources/data.parquet | Bin 0 -> 831 bytes examples/node/select_parquet_as_file.ts | 42 +++++++++++ package.json | 2 + .../fixtures/streaming_e2e_data.parquet | Bin 0 -> 831 bytes .../src/data_formatter/formatter.ts | 1 + packages/client-common/src/version.ts | 2 +- .../integration/node_streaming_e2e.test.ts | 66 +++++++++++++++++- packages/client-node/src/version.ts | 2 +- packages/client-web/src/version.ts | 2 +- tsconfig.json | 2 +- 17 files changed, 189 insertions(+), 11 deletions(-) create mode 100644 examples/node/insert_file_stream_parquet.ts create mode 100644 examples/node/resources/data.parquet create mode 100644 examples/node/select_parquet_as_file.ts create mode 100644 packages/client-common/__tests__/fixtures/streaming_e2e_data.parquet diff --git a/.docker/clickhouse/single_node_tls/Dockerfile b/.docker/clickhouse/single_node_tls/Dockerfile index 12641cd9..52b08fee 100644 --- a/.docker/clickhouse/single_node_tls/Dockerfile +++ b/.docker/clickhouse/single_node_tls/Dockerfile @@ -1,4 +1,4 @@ -FROM clickhouse/clickhouse-server:23.8-alpine +FROM clickhouse/clickhouse-server:23.10-alpine COPY .docker/clickhouse/single_node_tls/certificates /etc/clickhouse-server/certs RUN chown clickhouse:clickhouse -R /etc/clickhouse-server/certs \ && chmod 600 /etc/clickhouse-server/certs/* \ diff --git a/CHANGELOG.md b/CHANGELOG.md index b8a06602..90604404 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 0.2.6 (Common, Node.js) + +### New features + +- Added [Parquet format](https://clickhouse.com/docs/en/integrations/data-formats/parquet) streaming support. + See the new examples: + [insert from a file](./examples/node/insert_file_stream_parquet.ts), + [select into a file](./examples/node/select_parquet_as_file.ts). + ## 0.2.5 (Common, Node.js, Web) ### Bug fixes diff --git a/docker-compose.cluster.yml b/docker-compose.cluster.yml index 0b6555aa..cd477467 100644 --- a/docker-compose.cluster.yml +++ b/docker-compose.cluster.yml @@ -2,7 +2,7 @@ version: '2.3' services: clickhouse1: - image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.8-alpine}' + image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.10-alpine}' ulimits: nofile: soft: 262144 @@ -19,7 +19,7 @@ services: - './.docker/clickhouse/users.xml:/etc/clickhouse-server/users.xml' clickhouse2: - image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.8-alpine}' + image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.10-alpine}' ulimits: nofile: soft: 262144 diff --git a/docker-compose.yml b/docker-compose.yml index d70e6841..87c019e7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.8' services: clickhouse: - image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.8-alpine}' + image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-23.10-alpine}' container_name: 'clickhouse-js-clickhouse-server' ports: - '8123:8123' diff --git a/examples/node/insert_file_stream_csv.ts b/examples/node/insert_file_stream_csv.ts index a08af8d0..41275205 100644 --- a/examples/node/insert_file_stream_csv.ts +++ b/examples/node/insert_file_stream_csv.ts @@ -1,6 +1,7 @@ import { createClient } from '@clickhouse/client' import type { Row } from '@clickhouse/client-common' import Fs from 'fs' +import { cwd } from 'node:process' import Path from 'path' void (async () => { @@ -19,7 +20,7 @@ void (async () => { }) // contains data as 1,"foo","[1,2]"\n2,"bar","[3,4]"\n... - const filename = Path.resolve(process.cwd(), './node/resources/data.csv') + const filename = Path.resolve(cwd(), './node/resources/data.csv') await client.insert({ table: tableName, diff --git a/examples/node/insert_file_stream_ndjson.ts b/examples/node/insert_file_stream_ndjson.ts index 964e73d3..4556a4ae 100644 --- a/examples/node/insert_file_stream_ndjson.ts +++ b/examples/node/insert_file_stream_ndjson.ts @@ -1,6 +1,7 @@ import type { Row } from '@clickhouse/client' import { createClient } from '@clickhouse/client' import Fs from 'fs' +import { cwd } from 'node:process' import Path from 'path' import split from 'split2' @@ -20,7 +21,7 @@ void (async () => { // contains id as numbers in JSONCompactEachRow format ["0"]\n["0"]\n... // see also: NDJSON format - const filename = Path.resolve(process.cwd(), './node/resources/data.ndjson') + const filename = Path.resolve(cwd(), './node/resources/data.ndjson') await client.insert({ table: tableName, diff --git a/examples/node/insert_file_stream_parquet.ts b/examples/node/insert_file_stream_parquet.ts new file mode 100644 index 00000000..aefb13da --- /dev/null +++ b/examples/node/insert_file_stream_parquet.ts @@ -0,0 +1,58 @@ +import { createClient } from '@clickhouse/client' +import type { Row } from '@clickhouse/client-common' +import Fs from 'fs' +import { cwd } from 'node:process' +import Path from 'path' + +void (async () => { + const client = createClient() + const tableName = 'insert_file_stream_parquet' + await client.command({ + query: `DROP TABLE IF EXISTS ${tableName}`, + }) + await client.command({ + query: ` + CREATE TABLE ${tableName} + (id UInt64, name String, sku Array(UInt8)) + ENGINE MergeTree() + ORDER BY (id) + `, + }) + + const filename = Path.resolve(cwd(), './node/resources/data.parquet') + + /* + + (examples) $ pqrs cat node/resources/data.parquet + + ############################ + File: node/resources/data.parquet + ############################ + + {id: 0, name: [97], sku: [1, 2]} + {id: 1, name: [98], sku: [3, 4]} + {id: 2, name: [99], sku: [5, 6]} + + */ + + await client.insert({ + table: tableName, + values: Fs.createReadStream(filename), + format: 'Parquet', + }) + + const rs = await client.query({ + query: `SELECT * from ${tableName}`, + format: 'JSONEachRow', + }) + + for await (const rows of rs.stream()) { + // or just `rows.json()` + // to consume the entire response at once + rows.forEach((row: Row) => { + console.log(row.json()) + }) + } + + await client.close() +})() diff --git a/examples/node/resources/data.parquet b/examples/node/resources/data.parquet new file mode 100644 index 0000000000000000000000000000000000000000..957ee9e801a34710c30019616e0b232633d2815e GIT binary patch literal 831 zcmb`Gze)o^5XNWsHtQ*e1mi4saEJu61W6K7*x& zSXhXaPhcrNf{&2W(%C!z6|pkQ%`CUG-_4J2xcd4|83mRwTSJL31CRj*8D_&Jhh!n3 zjZ+0Grzxbr0(twZ=Gvc^%(lj@1<*IlN8we{AU&72_HkNCb<5N>(;M3UOY9X00+0ix zG;IL%KKBDPBq)Huq&<_GBcNE*WdYR^j0ej{=MzJ8gH9NOoU02x|+_J8oxqeIe$P=zrD>@vA zb}Q;ngzc~scKgKWs0XH=n0^=fP8iDHpB%;NwJYGtsCf`}0+i%i4vg^4 zbP`(tbkp=ij { + const client = createClient() + + const { stream } = await client.exec({ + query: `SELECT * from system.numbers LIMIT 10 FORMAT Parquet`, + }) + + const filename = Path.resolve(cwd(), './node/out.parquet') + const writeStream = Fs.createWriteStream(filename) + stream.pipe(writeStream) + await new Promise((resolve) => { + stream.on('end', resolve) + }) + + /* + + (examples) $ pqrs cat node/out.parquet + + ################# + File: node/out.parquet + ################# + + {number: 0} + {number: 1} + {number: 2} + {number: 3} + {number: 4} + {number: 5} + {number: 6} + {number: 7} + {number: 8} + {number: 9} + + */ + + await client.close() +})() diff --git a/package.json b/package.json index 8a831d69..70dc5e8f 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "@types/uuid": "^9.0.2", "@typescript-eslint/eslint-plugin": "^5.49.0", "@typescript-eslint/parser": "^5.49.0", + "apache-arrow": "^14.0.1", "eslint": "^8.32.0", "eslint-config-prettier": "^8.6.0", "eslint-plugin-prettier": "^4.2.1", @@ -63,6 +64,7 @@ "karma-typescript": "^5.5.4", "karma-webpack": "^5.0.0", "lint-staged": "^13.1.0", + "parquet-wasm": "^0.6.0-beta.1", "prettier": "2.8.3", "sinon": "^15.2.0", "split2": "^4.1.0", diff --git a/packages/client-common/__tests__/fixtures/streaming_e2e_data.parquet b/packages/client-common/__tests__/fixtures/streaming_e2e_data.parquet new file mode 100644 index 0000000000000000000000000000000000000000..957ee9e801a34710c30019616e0b232633d2815e GIT binary patch literal 831 zcmb`Gze)o^5XNWsHtQ*e1mi4saEJu61W6K7*x& zSXhXaPhcrNf{&2W(%C!z6|pkQ%`CUG-_4J2xcd4|83mRwTSJL31CRj*8D_&Jhh!n3 zjZ+0Grzxbr0(twZ=Gvc^%(lj@1<*IlN8we{AU&72_HkNCb<5N>(;M3UOY9X00+0ix zG;IL%KKBDPBq)Huq&<_GBcNE*WdYR^j0ej{=MzJ8gH9NOoU02x|+_J8oxqeIe$P=zrD>@vA zb}Q;ngzc~scKgKWs0XH=n0^=fP8iDHpB%;NwJYGtsCf`}0+i%i4vg^4 zbP`(tbkp=ij { ['2', 'c', [5, 6]], ] - it('should stream a file', async () => { + it('should stream an NDJSON file', async () => { // contains id as numbers in JSONCompactEachRow format ["0"]\n["1"]\n... const filename = 'packages/client-common/__tests__/fixtures/streaming_e2e_data.ndjson' @@ -55,6 +58,67 @@ describe('[Node.js] streaming e2e', () => { expect(actual).toEqual(expected) }) + it('should stream a Parquet file', async () => { + const filename = + 'packages/client-common/__tests__/fixtures/streaming_e2e_data.parquet' + await client.insert({ + table: tableName, + values: Fs.createReadStream(filename), + format: 'Parquet', + }) + + // check that the data was inserted correctly + const rs = await client.query({ + query: `SELECT * from ${tableName}`, + format: 'JSONCompactEachRow', + }) + + const actual: unknown[] = [] + for await (const rows of rs.stream()) { + rows.forEach((row: Row) => { + actual.push(row.json()) + }) + } + expect(actual).toEqual(expected) + + // check if we can stream it back and get the output matching the input file + const stream = await client + .exec({ + query: `SELECT * from ${tableName} FORMAT Parquet`, + clickhouse_settings: { + output_format_parquet_compression_method: 'none', + output_format_parquet_version: '2.6', + }, + }) + .then((r) => r.stream) + + const parquetChunks: Buffer[] = [] + for await (const chunk of stream) { + parquetChunks.push(chunk) + } + + const table = tableFromIPC( + readParquet(Buffer.concat(parquetChunks)).intoIPCStream() + ) + expect(table.schema.toString()).toEqual( + 'Schema<{ 0: id: Uint64, 1: name: Binary, 2: sku: List }>' + ) + const actualParquetData: unknown[] = [] + const textDecoder = new TextDecoder() + table.toArray().map((v) => { + const row: Record = {} + row['id'] = v.id + row['name'] = textDecoder.decode(v.name) // [char] -> String + row['sku'] = Array.from(v.sku.toArray()) // Vector -> UInt8Array -> Array + actualParquetData.push(row) + }) + expect(actualParquetData).toEqual([ + { id: 0n, name: 'a', sku: [1, 2] }, + { id: 1n, name: 'b', sku: [3, 4] }, + { id: 2n, name: 'c', sku: [5, 6] }, + ]) + }) + it('should stream a stream created in-place', async () => { await client.insert({ table: tableName, diff --git a/packages/client-node/src/version.ts b/packages/client-node/src/version.ts index ce6c73e6..42fb5c8c 100644 --- a/packages/client-node/src/version.ts +++ b/packages/client-node/src/version.ts @@ -1 +1 @@ -export default '0.2.5' +export default '0.2.6' diff --git a/packages/client-web/src/version.ts b/packages/client-web/src/version.ts index ce6c73e6..42fb5c8c 100644 --- a/packages/client-web/src/version.ts +++ b/packages/client-web/src/version.ts @@ -1 +1 @@ -export default '0.2.5' +export default '0.2.6' diff --git a/tsconfig.json b/tsconfig.json index d24a536c..20f42aa2 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "target": "es2019", + "target": "es2020", "module": "commonjs", "moduleResolution": "node", "declaration": true,