Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow movable workerData #27

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MessagePort } from 'worker_threads'
import type { MessagePort, TransferListItem } from 'worker_threads'

export interface StartupMessage {
filename: string | null
Expand Down Expand Up @@ -33,7 +33,7 @@ export const kValue = Symbol.for('Tinypool.valueOf')
export const kQueueOptions = Symbol.for('Tinypool.queueOptions')

// True if the object implements the Transferable interface
export function isTransferable(value: any): boolean {
export function isTransferable(value: any): value is Transferable {
return (
value != null &&
typeof value === 'object' &&
Expand All @@ -58,8 +58,9 @@ export function markMovable(value: object): void {
}

export interface Transferable {
readonly [kTransferable]: object
readonly [kTransferable]: TransferListItem
readonly [kValue]: object
readonly [kMovable]?: boolean
}

export interface Task {
Expand Down
121 changes: 67 additions & 54 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
MessageChannel,
MessagePort,
receiveMessageOnPort,
TransferListItem,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out worker_threads exports a TransferListItem type for us, so we don't have to extract it from MessagePort.postMessage anymore :)

} from 'worker_threads'
import { once } from 'events'
import EventEmitterAsyncResource from './EventEmitterAsyncResource'
Expand All @@ -28,7 +29,6 @@ import {
kQueueOptions,
isTransferable,
markMovable,
isMovable,
kTransferable,
kValue,
} from './common'
Expand Down Expand Up @@ -159,14 +159,14 @@ const kDefaultOptions: FilledOptions = {
}

interface RunOptions {
transferList?: TransferList
transferList?: TransferListItem[]
filename?: string | null
signal?: AbortSignalAny | null
name?: string | null
}

interface FilledRunOptions extends RunOptions {
transferList: TransferList | never
transferList: TransferListItem[] | undefined
filename: string | null
signal: AbortSignalAny | null
name: string | null
Expand All @@ -179,17 +179,17 @@ const kDefaultRunOptions: FilledRunOptions = {
name: null,
}

class DirectlyTransferable implements Transferable {
#value: object
constructor(value: object) {
class DirectlyTransferable<T extends TransferListItem> implements Transferable {
#value: T
constructor(value: T) {
this.#value = value
}

get [kTransferable](): object {
get [kTransferable](): T {
return this.#value
}

get [kValue](): object {
get [kValue](): T {
return this.#value
}
}
Expand All @@ -200,27 +200,18 @@ class ArrayBufferViewTransferable implements Transferable {
this.#view = view
}

get [kTransferable](): object {
get [kTransferable](): ArrayBufferLike {
return this.#view.buffer
}

get [kValue](): object {
get [kValue](): ArrayBufferView {
return this.#view
}
}

let taskIdCounter = 0

type TaskCallback = (err: Error, result: any) => void
// Grab the type of `transferList` off `MessagePort`. At the time of writing,
// only ArrayBuffer and MessagePort are valid, but let's avoid having to update
// our types here every time Node.js adds support for more objects.
type TransferList = MessagePort extends {
postMessage(value: any, transferList: infer T): any
}
? T
: never
type TransferListItem = TransferList extends (infer T)[] ? T : never

function maybeFileURLToPath(filename: string): string {
return filename.startsWith('file:')
Expand All @@ -233,7 +224,7 @@ function maybeFileURLToPath(filename: string): string {
class TaskInfo extends AsyncResource implements Task {
callback: TaskCallback
task: any
transferList: TransferList
transferList: TransferListItem[]
filename: string
name: string
taskId: number
Expand All @@ -245,7 +236,7 @@ class TaskInfo extends AsyncResource implements Task {

constructor(
task: any,
transferList: TransferList,
transferList: TransferListItem[],
filename: string,
name: string,
callback: TaskCallback,
Expand All @@ -254,23 +245,8 @@ class TaskInfo extends AsyncResource implements Task {
) {
super('Tinypool.Task', { requireManualDestroy: true, triggerAsyncId })
this.callback = callback
this.task = task
this.task = fillTransferList(task, transferList)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also adds nested movable support to the pool.run method. ☝️

Note that only 1 level of nesting is checked. In other words, doing this 👇 is not supported:

pool.run({ a: { b: Tinypool.move(b) } })

this.transferList = transferList

// If the task is a Transferable returned by
// Tinypool.move(), then add it to the transferList
// automatically
if (isMovable(task)) {
// This condition should never be hit but typescript
// complains if we dont do the check.
/* istanbul ignore if */
if (this.transferList == null) {
this.transferList = []
}
this.transferList = this.transferList.concat(task[kTransferable])
this.task = task[kValue]
}

this.filename = filename
this.name = name
this.taskId = taskIdCounter++
Expand Down Expand Up @@ -599,15 +575,17 @@ class ThreadPool {
}

_addNewWorker(): void {
const pool = this
const transferList: TransferListItem[] = []
const workerData = fillTransferList(this.options.workerData, transferList)

const __dirname = dirname(fileURLToPath(import.meta.url))
const worker = new Worker(resolve(__dirname, './worker.js'), {
env: this.options.env,
argv: this.options.argv,
execArgv: this.options.execArgv,
resourceLimits: this.options.resourceLimits,
workerData: this.options.workerData,
workerData,
transferList,
trackUnmanagedFds: this.options.trackUnmanagedFds,
})

Expand All @@ -619,19 +597,21 @@ class ThreadPool {
const taskInfo = workerInfo.taskInfos.get(taskId)
workerInfo.taskInfos.delete(taskId)

if (!this.options.isolateWorkers) pool.workers.maybeAvailable(workerInfo)
if (!this.options.isolateWorkers) {
this.workers.maybeAvailable(workerInfo)
}

/* istanbul ignore if */
if (taskInfo === undefined) {
const err = new Error(
`Unexpected message from Worker: ${inspect(message)}`
)
pool.publicInterface.emit('error', err)
this.publicInterface.emit('error', err)
} else {
taskInfo.done(message.error, result)
}

pool._processPendingMessages()
this._processPendingMessages()
}

const { port1, port2 } = new MessageChannel()
Expand Down Expand Up @@ -1002,25 +982,18 @@ class Tinypool extends EventEmitterAsyncResource {
return version
}

static move(
val:
| Transferable
| TransferListItem
| ArrayBufferView
| ArrayBuffer
| MessagePort
) {
static move<T>(val: T | Transferable): T {
if (val != null && typeof val === 'object' && typeof val !== 'function') {
if (!isTransferable(val)) {
if ((types as any).isArrayBufferView(val)) {
val = new ArrayBufferViewTransferable(val as ArrayBufferView)
if (types.isArrayBufferView(val)) {
val = new ArrayBufferViewTransferable(val)
} else {
val = new DirectlyTransferable(val)
val = new DirectlyTransferable(val as any)
}
}
markMovable(val)
}
return val
return val as T
}

static get transferableSymbol() {
Expand All @@ -1036,6 +1009,46 @@ class Tinypool extends EventEmitterAsyncResource {
}
}

/**
* Handle transferable `data` or transferable properties of `data` by unwrapping
* them and adding their transferable objects to the given `transferList` array.
*/
function fillTransferList(data: any, transferList: TransferListItem[]): any {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the star of the show. The rest is mainly TypeScript improvements.

if (isTransferable(data)) {
if (!transferList.includes(data[kTransferable])) {
transferList.push(data[kTransferable])
}
return data[kValue]
}
if (data && data.constructor === Object) {
let cloned = false
for (const key of Object.getOwnPropertyNames(data)) {
const value = data[key]
if (isTransferable(value)) {
if (!cloned) {
cloned = true
// Avoid calling getters again by not using {...data}
data = Object.defineProperties(
{},
Object.getOwnPropertyDescriptors(data)
)
}
// Overwrite getters in the cloned data.
Object.defineProperty(data, key, {
value: value[kValue],
configurable: true,
enumerable: true,
writable: true,
})
if (!transferList.includes(value[kTransferable])) {
transferList.push(value[kTransferable])
}
}
}
}
return data
}

export * from './common'
export { Tinypool, Options }
export default Tinypool