Skip to content

Commit

Permalink
feat(pgmq): supports headers for pgmq v1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
waitingsong committed Jan 9, 2025
1 parent 8e347c7 commit adc36da
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 12 deletions.
8 changes: 4 additions & 4 deletions packages/pgmq-js/src/lib/msg-manager/msg-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ export class MsgManager {
}

protected async _send<T extends MsgContent>(options: SendOptions<T>): Promise<MsgId[]> {
const { queue, msg, delay = 0, trx } = options
const { queue, msg, headers, delay = 0, trx } = options

await assertWithTrx(typeof msg === 'object', 'msg must be object', trx)
const query = typeof delay === 'number' ? MsgSql.send : MsgSql.send2
const res = await this.execute<QueryResponse<SendResp>>(query, [queue, msg, delay], trx)
const res = await this.execute<QueryResponse<SendResp>>(query, [queue, msg, headers ?? null, delay], trx)
const [row] = res.rows
await assertWithTrx(row, 'send failed', trx)
assert(row)
Expand Down Expand Up @@ -97,10 +97,10 @@ export class MsgManager {
}

protected async _sendBatch<T extends MsgContent>(options: SendBatchOptions<T>): Promise<MsgId[]> {
const { queue, msgs, delay = 0, trx } = options
const { queue, msgs, headers, delay = 0, trx } = options

const query = typeof delay === 'number' ? MsgSql.sendBatch : MsgSql.sendBatch2
const res = await this.execute<QueryResponse<SendBatchResp>>(query, [queue, msgs, delay], trx)
const res = await this.execute<QueryResponse<SendBatchResp>>(query, [queue, msgs, headers ?? null, delay], trx)
const ret = res.rows.map(row => row.send_batch)
return ret
}
Expand Down
1 change: 1 addition & 0 deletions packages/pgmq-js/src/lib/msg-manager/msg.helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export function parseMessage(input: RecordSnakeKeys<Message>): Message {
const ret: Message = {
msgId: input.msg_id,
message: input.message,
headers: input.headers,
enqueuedAt: input.enqueued_at,
readCt: input.read_ct,
vt: input.vt,
Expand Down
8 changes: 4 additions & 4 deletions packages/pgmq-js/src/lib/msg-manager/msg.sql.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

export enum MsgSql {
send = 'SELECT * FROM pgmq.send(?, ?, ?::int4)',
send2 = 'SELECT * FROM pgmq.send(?, ?, ?::timestamp)',
sendBatch = 'SELECT * FROM pgmq.send_batch(?, ?::jsonb[], ?::int4)',
sendBatch2 = 'SELECT * FROM pgmq.send_batch(?, ?::jsonb[], ?::timestamp)',
send = 'SELECT * FROM pgmq.send(?, ?, ?, ?::int4)',
send2 = 'SELECT * FROM pgmq.send(?, ?, ?, ?::timestamp)',
sendBatch = 'SELECT * FROM pgmq.send_batch(?, ?::jsonb[], ?::jsonb[], ?::int4)',
sendBatch2 = 'SELECT * FROM pgmq.send_batch(?, ?::jsonb[], ?::jsonb[], ?::timestamp)',
read = 'SELECT * FROM pgmq.read(?, ?, ?)',
read2 = 'SELECT * FROM pgmq.read(?, ?, ?, ?)',
pop = 'SELECT * FROM pgmq.pop(?)',
Expand Down
12 changes: 8 additions & 4 deletions packages/pgmq-js/src/lib/msg-manager/msg.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@ export type MsgId = string // bigint as string

export type MsgContent = object | null

export interface Message<T extends MsgContent = MsgContent> {
export interface Message<T extends MsgContent = MsgContent, H extends MsgContent = MsgContent> {
msgId: MsgId
message: T
enqueuedAt: Date
/** read count number */
readCt: number
vt: Date
headers: H
}

export interface MessageDto<T extends MsgContent = MsgContent> {
export interface MessageDto<T extends MsgContent = MsgContent, H extends MsgContent = MsgContent> {
msgId: MsgId
message: T
enqueuedAt: string
/** read count number */
readCt: number
vt: string
headers: H
}


Expand All @@ -35,26 +37,28 @@ type TimeStampStr = string
* @link https://tembo-io.github.io/pgmq/api/sql/functions/#send
* @param delay Time in seconds before the message becomes visible. Defaults to 0.
*/
export interface SendOptions<T extends MsgContent = MsgContent> extends OptionsBase {
export interface SendOptions<T extends MsgContent = MsgContent, H extends MsgContent = MsgContent> extends OptionsBase {
queue: string | string[]
msg: T
/**
* @default 0
*/
delay?: number | TimeStampStr
headers?: H
}

/**
* Send multiple messages to the queue or queues (without creating a route)
* @param delay Time in seconds before the message becomes visible. Defaults to 0.
*/
export interface SendBatchOptions<T extends MsgContent = MsgContent> extends OptionsBase {
export interface SendBatchOptions<T extends MsgContent = MsgContent, H extends MsgContent = MsgContent> extends OptionsBase {
queue: string | string[]
msgs: T[]
/**
* @default 0
*/
delay?: number | TimeStampStr
headers?: H[]
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import assert from 'node:assert'

import { fileShortPath } from '@waiting/shared-core'

import { type QueueOptionsBase, type SendOptions, Pgmq, genRandomName } from '##/index.js'
import { dbConfig } from '#@/config.unittest.js'


const rndString = genRandomName(6)

describe(fileShortPath(import.meta.url), () => {
let mq: Pgmq
const msg = {
foo: 'bar',
}
const options: SendOptions = {
queue: rndString,
msg,
headers: { barz: 'bar' },
}
const createOpts: QueueOptionsBase = { queue: rndString }

before(async () => {
mq = new Pgmq('test', dbConfig)
await mq.queue.createUnlogged(createOpts)
})
after(async () => {
await mq.queue.drop(createOpts)
await mq.destroy()
})

it(`msg.send(${rndString}), msg, headers`, async () => {
const msgIds = await mq.msg.send(options)
assert(msgIds.length === 1, 'send failed')
assert(msgIds[0] === '1', 'send failed')
})
})

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import assert from 'node:assert'

import { fileShortPath } from '@waiting/shared-core'

import { type QueueOptionsBase, type SendBatchOptions, Pgmq, genRandomName } from '##/index.js'
import { dbConfig } from '#@/config.unittest.js'


const rndString = genRandomName(6)

describe(fileShortPath(import.meta.url), () => {
let mq: Pgmq
const headers1 = { key: 'k1' }
const headers2 = { key: 'k2' }
const msg = {
foo: 'bar',
}
const opts: SendBatchOptions = {
queue: rndString,
msgs: [msg, msg],
headers: [headers1, headers2],
}
const createOpts: QueueOptionsBase = { queue: rndString }

before(async () => {
mq = new Pgmq('test', dbConfig)
await mq.queue.createUnlogged(createOpts)
})
after(async () => {
await mq.queue.drop(createOpts)
await mq.destroy()
})

it(`msg.sendBatch(${rndString}, msg[])`, async () => {
const msgIds = await mq.msg.sendBatch(opts)
assert(msgIds.length === 2, 'sendBatch failed')
assert(msgIds[0] === '1', `sendBatch failed: ${msgIds[0]}`)
assert(msgIds[1] === '2', `sendBatch failed: ${msgIds[1]}`)
})

})

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import assert from 'node:assert'

import { fileShortPath } from '@waiting/shared-core'

import type { Message, QueueOptionsBase, ReadOptions, SendOptions } from '##/index.js'
import { Pgmq, genRandomName } from '##/index.js'
import { dbConfig } from '#@/config.unittest.js'


const rndString = genRandomName(6)
const msgToSend = {
foo: 'bar',
rnd: rndString,
}

describe(fileShortPath(import.meta.url), () => {
let mq: Pgmq
const opts: ReadOptions = {
queue: rndString,
}
const createOpts: QueueOptionsBase = { queue: rndString }
const headers = { key: rndString }

before(async () => {
const sendOpts: SendOptions = {
queue: rndString,
msg: msgToSend,
headers,
}
mq = new Pgmq('test', dbConfig)
await mq.queue.createUnlogged(createOpts)
await mq.msg.send(sendOpts)
})
after(async () => {
await mq.queue.drop(createOpts)
await mq.destroy()
})

it(`msg.read(${rndString}) headers`, async () => {
const msg: Message | null = await mq.msg.read(opts)
assert(msg)
assert(msg.msgId === '1')
assert(msg.message, 'msg.message not exist')
assert.deepStrictEqual(msg.message, msgToSend, 'msg.message not equal')
assert.deepStrictEqual(msg.headers, headers, 'msg.headers not equal, msg.headers: ' + JSON.stringify(msg.headers))

assert(msg.enqueuedAt instanceof Date, 'msg.enqueuedAt not exist')
assert(msg.enqueuedAt.getTime() > 0, 'msg.enqueuedAt invalid')

assert(msg.readCt === 1, 'msg.readCt not equal 1')

assert(msg.vt instanceof Date, 'msg.vt not exist')
assert(msg.vt.getTime() > 0, 'msg.vt invalid')
})
})

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import assert from 'node:assert'

import { fileShortPath } from '@waiting/shared-core'

import { type QueueOptionsBase, type ReadBatchOptions, type SendBatchOptions, Pgmq, genRandomName } from '##/index.js'
import { dbConfig } from '#@/config.unittest.js'


const rndString = genRandomName(6)
const msgToSend = {
foo: 'bar',
rnd: rndString,
}

describe(fileShortPath(import.meta.url), () => {
let mq: Pgmq
const headers1 = { key: 'k1' }
const options: ReadBatchOptions = {
queue: rndString,
vt: 0,
qty: 3,
}
const createOpts: QueueOptionsBase = { queue: rndString }

before(async () => {
const sendOpts: SendBatchOptions = {
queue: rndString,
msgs: [msgToSend, msgToSend],
headers: [headers1],
}
mq = new Pgmq('test', dbConfig)
await mq.queue.createUnlogged(createOpts)
await mq.msg.sendBatch(sendOpts)
})
after(async () => {
await mq.queue.drop(createOpts)
await mq.destroy()
})

it(`msg.readBatch(${rndString}) [headers, null]`, async () => {
const msgs = await mq.msg.readBatch(options)
assert(msgs.length === 2, 'msgs.length not equal 2')

const [msg1, msg2] = msgs
assert(msg1)
assert(msg1.msgId === '1')
assert.deepStrictEqual(msg1.message, msgToSend, 'msg.message not equal')
assert.deepStrictEqual(msg1.headers, headers1, 'msg.headers not equal, msg.headers: ' + JSON.stringify(msg1.headers))

assert(msg1.enqueuedAt instanceof Date, 'msg.enqueuedAt not exist')
assert(msg1.enqueuedAt.getTime() > 0, 'msg.enqueuedAt invalid')

assert(msg1.readCt === 1, 'msg.readCt not equal 1')

assert(msg1.vt instanceof Date, 'msg.vt not exist')
assert(msg1.vt.getTime() > 0, 'msg.vt invalid')

assert(msg2)
assert(msg2.msgId === '2')
assert.deepStrictEqual(msg2.message, msgToSend, 'msg.message not equal')
assert(msg2.headers === null, 'msg.headers not equal, msg.headers: ' + JSON.stringify(msg2.headers))
})
})

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import assert from 'node:assert'

import { fileShortPath } from '@waiting/shared-core'

import { type QueueOptionsBase, type ReadBatchOptions, type SendBatchOptions, Pgmq, genRandomName } from '##/index.js'
import { dbConfig } from '#@/config.unittest.js'


const rndString = genRandomName(6)
const msgToSend = {
foo: 'bar',
rnd: rndString,
}

describe(fileShortPath(import.meta.url), () => {
let mq: Pgmq
const headers1 = { key: 'k1' }
const headers2 = { key: 'k2' }
const options: ReadBatchOptions = {
queue: rndString,
vt: 0,
qty: 3,
}
const createOpts: QueueOptionsBase = { queue: rndString }

before(async () => {
const sendOpts: SendBatchOptions = {
queue: rndString,
msgs: [msgToSend, msgToSend],
headers: [headers1, headers2],
}
mq = new Pgmq('test', dbConfig)
await mq.queue.createUnlogged(createOpts)
await mq.msg.sendBatch(sendOpts)
})
after(async () => {
await mq.queue.drop(createOpts)
await mq.destroy()
})

it(`msg.readBatch(${rndString}) headers`, async () => {
const msgs = await mq.msg.readBatch(options)
assert(msgs.length === 2, 'msgs.length not equal 2')

const [msg1, msg2] = msgs
assert(msg1)
assert(msg1.msgId === '1')
assert.deepStrictEqual(msg1.message, msgToSend, 'msg.message not equal')
assert.deepStrictEqual(msg1.headers, headers1, 'msg.headers not equal, msg.headers: ' + JSON.stringify(msg1.headers))

assert(msg1.enqueuedAt instanceof Date, 'msg.enqueuedAt not exist')
assert(msg1.enqueuedAt.getTime() > 0, 'msg.enqueuedAt invalid')

assert(msg1.readCt === 1, 'msg.readCt not equal 1')

assert(msg1.vt instanceof Date, 'msg.vt not exist')
assert(msg1.vt.getTime() > 0, 'msg.vt invalid')

assert(msg2)
assert(msg2.msgId === '2')
assert.deepStrictEqual(msg2.message, msgToSend, 'msg.message not equal')
assert.deepStrictEqual(msg2.headers, headers2, 'msg.headers not equal, msg.headers: ' + JSON.stringify(msg2.headers))
})
})

0 comments on commit adc36da

Please sign in to comment.