Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(loader-utils): Consolidate parseWithWorker with processOnWorker #1564

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable no-restricted-globals */

import {createWorker} from '@loaders.gl/worker-utils'
import {WorkerBody} from '@loaders.gl/worker-utils';
// import {validateLoaderVersion} from './validate-loader-version';

Expand All @@ -10,79 +10,33 @@ let requestId = 0;
* @param loader
*/
export function createLoaderWorker(loader: any) {
// Check that we are actually in a worker thread
if (typeof self === 'undefined') {
return;
}

WorkerBody.onmessage = async (type, payload) => {
switch (type) {
case 'process':
try {
// validateLoaderVersion(loader, data.source.split('@')[1]);

const {input, options = {}} = payload;

const result = await parseData({
loader,
arrayBuffer: input,
options,
context: {
parse: parseOnMainThread
}
});
WorkerBody.postMessage('done', {result});
} catch (error) {
const message = error instanceof Error ? error.message : '';
WorkerBody.postMessage('error', {error: message});
}
break;
default:
}
};
}

function parseOnMainThread(arrayBuffer, options = {}) {
return new Promise((resolve, reject) => {
const id = requestId++;

/**
*/
const onMessage = (type, payload) => {
if (payload.id !== id) {
// not ours
return;
createWorker(parseOnWorker)
async function parseOnWorker(input: any, options: {[key: string]: any}, processOnMainThread): Promise<any> {
// validateLoaderVersion(loader, data.source.split('@')[1]);

const result = await parseData({
loader,
arrayBuffer: input,
options,
context: {
parse: processOnMainThread
}
});

switch (type) {
case 'done':
WorkerBody.removeEventListener(onMessage);
resolve(payload.result);
break;

case 'error':
WorkerBody.removeEventListener(onMessage);
reject(payload.error);
break;

default:
// ignore
}
};

WorkerBody.addEventListener(onMessage);

// Ask the main thread to decode data
const payload = {id, input: arrayBuffer, options};
WorkerBody.postMessage('process', payload);
});
return result;
}
}

// TODO - Support byteOffset and byteLength (enabling parsing of embedded binaries without copies)
// TODO - Why not support async loader.parse* funcs here?
// TODO - Why not reuse a common function instead of reimplementing loader.parse* selection logic? Keeping loader small?
// TODO - Lack of appropriate parser functions can be detected when we create worker, no need to wait until parse
async function parseData({loader, arrayBuffer, options, context}) {
async function parseData({
loader,
arrayBuffer,
options,
context
}) {
let data;
let parser;
if (loader.parseSync || loader.parse) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import type {WorkerJob, WorkerMessageType, WorkerMessagePayload} from '@loaders.gl/worker-utils';
import type {Loader, LoaderOptions, LoaderContext} from '../../types';
import {WorkerFarm, getWorkerURL} from '@loaders.gl/worker-utils';
import {canProcessOnWorker, processOnWorker} from '@loaders.gl/worker-utils';
import parseToNodeImage from '@loaders.gl/images/lib/parsers/parse-to-node-image';

/**
* Determines if a loader can parse with worker
* @param loader
* @param options
*/
export function canParseWithWorker(loader: Loader, options?: LoaderOptions) {
if (!WorkerFarm.isSupported()) {
if (canProcessOnWorker(loader, options)) {
return false;
}

Expand All @@ -23,69 +24,7 @@ export async function parseWithWorker(
loader: Loader,
data,
options?: LoaderOptions,
context?: LoaderContext,
parseOnMainThread?: Function
context?: LoaderContext
) {
const name = loader.id; // TODO
const url = getWorkerURL(loader, options);

const workerFarm = WorkerFarm.getWorkerFarm(options);
const workerPool = workerFarm.getWorkerPool({name, url});

// options.log object contains functions which cannot be transferred
// TODO - decide how to handle logging on workers
options = JSON.parse(JSON.stringify(options));

const job = await workerPool.startJob(
'process-on-worker',
onMessage.bind(null, parseOnMainThread)
);

job.postMessage('process', {
// @ts-ignore
input: data,
options
});

const result = await job.result;
return await result.result;
}

/**
* Handle worker's responses to the main thread
* @param job
* @param type
* @param payload
*/
async function onMessage(
parseOnMainThread,
job: WorkerJob,
type: WorkerMessageType,
payload: WorkerMessagePayload
) {
switch (type) {
case 'done':
job.done(payload);
break;

case 'error':
job.error(payload.error);
break;

case 'process':
// Worker is asking for main thread to parseO
const {id, input, options} = payload;
try {
const result = await parseOnMainThread(input, options);
job.postMessage('done', {id, result});
} catch (error) {
const message = error instanceof Error ? error.message : 'unknown error';
job.postMessage('error', {id, error: message});
}
break;

default:
// eslint-disable-next-line
console.warn(`parse-with-worker unknown message ${type}`);
}
processOnWorker(loader, data, options, parseOnMainThread);
}