Skip to content

Commit

Permalink
Move geojson worker source to be more async like.
Browse files Browse the repository at this point in the history
  • Loading branch information
HarelM committed Oct 27, 2023
1 parent 6c0212b commit f5b4206
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 133 deletions.
84 changes: 38 additions & 46 deletions src/source/geojson_worker_source.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ describe('resourceTiming', () => {
window.performance.getEntriesByName = jest.fn().mockReturnValue([exampleResourceTiming]);

const layerIndex = new StyleLayerIndex(layers);
const source = new GeoJSONWorkerSource(actor, layerIndex, [], (params, callback) => {
callback(null, geoJson);
return {cancel: () => {}};
});
const source = new GeoJSONWorkerSource(actor, layerIndex, [], () => Promise.resolve(geoJson));

source.loadData({source: 'testSource', request: {url: 'http://localhost/nonexistent', collectResourceTiming: true}} as LoadGeoJSONParameters)
.then((result) => {
Expand Down Expand Up @@ -149,10 +146,7 @@ describe('resourceTiming', () => {
jest.spyOn(perf, 'clearMeasures').mockImplementation(() => { return null; });

const layerIndex = new StyleLayerIndex(layers);
const source = new GeoJSONWorkerSource(actor, layerIndex, [], (params, callback) => {
callback(null, geoJson);
return {cancel: () => {}};
});
const source = new GeoJSONWorkerSource(actor, layerIndex, [], () => Promise.resolve(geoJson));

source.loadData({source: 'testSource', request: {url: 'http://localhost/nonexistent', collectResourceTiming: true}} as LoadGeoJSONParameters)
.then((result) => {
Expand Down Expand Up @@ -219,53 +213,51 @@ describe('loadData', () => {

const layerIndex = new StyleLayerIndex(layers);
function createWorker() {
const worker = new GeoJSONWorkerSource(actor, layerIndex, []);

// Making the call to loadGeoJSON asynchronous
// allows these tests to mimic a message queue building up
// (regardless of timing)
const originalLoadGeoJSON = worker.loadGeoJSON;
worker.loadGeoJSON = function(params, callback) {
const timeout = setTimeout(() => {
originalLoadGeoJSON(params, callback);
}, 0);

return {cancel: () => clearTimeout(timeout)};
};
return worker;
return new GeoJSONWorkerSource(actor, layerIndex, []);
}

test('abandons previous callbacks', done => {
test('abandons previous callbacks', async () => {
const worker = createWorker();
let firstCallbackHasRun = false;

worker.loadData({source: 'source1', data: JSON.stringify(geoJson)} as LoadGeoJSONParameters).then((result) => {
expect(result && result.abandoned).toBeTruthy();
firstCallbackHasRun = true;
}).catch(() => expect(false).toBeTruthy());
server.respondWith(request => {
request.respond(200, {'Content-Type': 'application/json'}, JSON.stringify(geoJson));
});

worker.loadData({source: 'source1', data: JSON.stringify(geoJson)} as LoadGeoJSONParameters).then((result) => {
expect(result && result.abandoned).toBeFalsy();
expect(firstCallbackHasRun).toBeTruthy();
done();
}).catch(() => expect(false).toBeTruthy());
const p1 = worker.loadData({source: 'source1', request: {url: ''}} as LoadGeoJSONParameters);
await new Promise((resolve) => (setTimeout(resolve, 0)));

const p2 = worker.loadData({source: 'source1', request: {url: ''}} as LoadGeoJSONParameters);

await new Promise((resolve) => (setTimeout(resolve, 0)));

server.respond();

const firstCallResult = await p1;
expect(firstCallResult && firstCallResult.abandoned).toBeTruthy();
const result = await p2;
expect(result && result.abandoned).toBeFalsy();
});

test('removeSource aborts callbacks', done => {
test('removeSource aborts callbacks', async () => {
const worker = createWorker();
let loadDataCallbackHasRun = false;
worker.loadData({source: 'source1', data: JSON.stringify(geoJson)} as LoadGeoJSONParameters).then((result) => {
expect(result && result.abandoned).toBeTruthy();
loadDataCallbackHasRun = true;
}).catch(() => expect(false).toBeTruthy());

worker.removeSource({source: 'source1', type: 'type'}).then(() => {
// Allow promsie to resolve
setTimeout(() => {
expect(loadDataCallbackHasRun).toBeTruthy();
done();
}, 0);
}).catch(() => expect(false).toBeTruthy());
server.respondWith(request => {
request.respond(200, {'Content-Type': 'application/json'}, JSON.stringify(geoJson));
});

const loadPromise = worker.loadData({source: 'source1', request: {url: ''}} as LoadGeoJSONParameters);

await new Promise((resolve) => (setTimeout(resolve, 0)));

const removePromise = worker.removeSource({source: 'source1', type: 'type'});

await new Promise((resolve) => (setTimeout(resolve, 0)));

server.respond();

const result = await loadPromise;
expect(result && result.abandoned).toBeTruthy();
await removePromise;
});

test('loadData with geojson creates an non-updateable source', done => {
Expand Down
163 changes: 76 additions & 87 deletions src/source/geojson_worker_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import type {IActor} from '../util/actor';
import type {StyleLayerIndex} from '../style/style_layer_index';

import type {LoadVectorTileResult} from './vector_tile_worker_source';
import type {RequestParameters, ResponseCallback} from '../util/ajax';
import type {Cancelable} from '../types/cancelable';
import type {RequestParameters} from '../util/ajax';
import {isUpdateableGeoJSON, type GeoJSONSourceDiff, applySourceDiff, toUpdateable, GeoJSONFeatureId} from './geojson_source_diff';
import type {ClusterIDAndSource, GeoJSONWorkerSourceLoadDataResult, RemoveSourceParams} from '../util/actor_messages';

Expand All @@ -44,7 +43,7 @@ export type LoadGeoJSONParameters = GeoJSONWorkerOptions & {
dataDiff?: GeoJSONSourceDiff;
};

export type LoadGeoJSON = (params: LoadGeoJSONParameters, callback: ResponseCallback<any>) => Cancelable;
export type LoadGeoJSON = (params: LoadGeoJSONParameters, abortController: AbortController) => Promise<GeoJSON.GeoJSON>;

type GeoJSONIndex = ReturnType<typeof geojsonvt> | Supercluster;

Expand All @@ -57,8 +56,7 @@ type GeoJSONIndex = ReturnType<typeof geojsonvt> | Supercluster;
* For a full example, see [mapbox-gl-topojson](https://github.com/developmentseed/mapbox-gl-topojson).
*/
export class GeoJSONWorkerSource extends VectorTileWorkerSource {
_pendingPromise: (value: GeoJSONWorkerSourceLoadDataResult) => void;
_pendingRequest: Cancelable;
_pendingRequest: AbortController;
_geoJSONIndex: GeoJSONIndex;
_dataUpdateable = new Map<GeoJSONFeatureId, GeoJSON.Feature>();

Expand Down Expand Up @@ -118,64 +116,54 @@ export class GeoJSONWorkerSource extends VectorTileWorkerSource {
* @param params - the parameters
* @param callback - the callback for completion or error
*/
loadData(params: LoadGeoJSONParameters): Promise<GeoJSONWorkerSourceLoadDataResult> {
this._pendingRequest?.cancel();
if (this._pendingPromise) {
// Tell the foreground the previous call has been abandoned
this._pendingPromise({abandoned: true});
}
return new Promise<GeoJSONWorkerSourceLoadDataResult>((resolve, reject) => {
const perf = (params && params.request && params.request.collectResourceTiming) ?
new RequestPerformance(params.request) : false;

this._pendingPromise = resolve;
this._pendingRequest = this.loadGeoJSON(params, (err?: Error | null, data?: any | null) => {
delete this._pendingPromise;
delete this._pendingRequest;

if (err || !data) {
reject(err);
return;
}
if (typeof data !== 'object') {
reject(new Error(`Input data given to '${params.source}' is not a valid GeoJSON object.`));
return;
}
rewind(data, true);
async loadData(params: LoadGeoJSONParameters): Promise<GeoJSONWorkerSourceLoadDataResult> {
this._pendingRequest?.abort();
const perf = (params && params.request && params.request.collectResourceTiming) ?
new RequestPerformance(params.request) : false;

this._pendingRequest = new AbortController();
try {
let data = await this.loadGeoJSON(params, this._pendingRequest);
delete this._pendingRequest;
if (typeof data !== 'object') {
throw new Error(`Input data given to '${params.source}' is not a valid GeoJSON object.`);
}
rewind(data, true);

try {
if (params.filter) {
const compiled = createExpression(params.filter, {type: 'boolean', 'property-type': 'data-driven', overridable: false, transition: false} as any);
if (compiled.result === 'error')
throw new Error(compiled.value.map(err => `${err.key}: ${err.message}`).join(', '));
if (params.filter) {
const compiled = createExpression(params.filter, {type: 'boolean', 'property-type': 'data-driven', overridable: false, transition: false} as any);
if (compiled.result === 'error')
throw new Error(compiled.value.map(err => `${err.key}: ${err.message}`).join(', '));

const features = data.features.filter(feature => compiled.value.evaluate({zoom: 0}, feature));
data = {type: 'FeatureCollection', features};
}
const features = (data as any).features.filter(feature => compiled.value.evaluate({zoom: 0}, feature));
data = {type: 'FeatureCollection', features};
}

this._geoJSONIndex = params.cluster ?
new Supercluster(getSuperclusterOptions(params)).load(data.features) :
geojsonvt(data, params.geojsonVtOptions);
} catch (err) {
reject(err);
return;
}
this._geoJSONIndex = params.cluster ?
new Supercluster(getSuperclusterOptions(params)).load((data as any).features) :
geojsonvt(data, params.geojsonVtOptions);

this.loaded = {};
this.loaded = {};

const result = {} as { resourceTiming: {[_: string]: Array<PerformanceResourceTiming>} };
if (perf) {
const resourceTimingData = perf.finish();
// it's necessary to eval the result of getEntriesByName() here via parse/stringify
// late evaluation in the main thread causes TypeError: illegal invocation
if (resourceTimingData) {
result.resourceTiming = {};
result.resourceTiming[params.source] = JSON.parse(JSON.stringify(resourceTimingData));
}
const result = {} as GeoJSONWorkerSourceLoadDataResult;
if (perf) {
const resourceTimingData = perf.finish();
// it's necessary to eval the result of getEntriesByName() here via parse/stringify
// late evaluation in the main thread causes TypeError: illegal invocation
if (resourceTimingData) {
result.resourceTiming = {};
result.resourceTiming[params.source] = JSON.parse(JSON.stringify(resourceTimingData));
}
resolve(result);
});
});
}
return result;
} catch (err) {
delete this._pendingRequest;
// HM TODO: make this message a constant somewhere
if (err.message === 'AbortError') {
return {abandoned: true};
}
throw err;
}
}

/**
Expand Down Expand Up @@ -209,48 +197,49 @@ export class GeoJSONWorkerSource extends VectorTileWorkerSource {
* @param callback - the callback for completion or error
* @returns A Cancelable object.
*/
loadGeoJSON = (params: LoadGeoJSONParameters, callback: ResponseCallback<any>): Cancelable => {
loadGeoJSON = async (params: LoadGeoJSONParameters, abortController: AbortController): Promise<GeoJSON.GeoJSON> => {
const {promoteId} = params;
// Because of same origin issues, urls must either include an explicit
// origin or absolute path.
// ie: /foo/bar.json or http://example.com/bar.json
// but not ../foo/bar.json
if (params.request) {
return getJSON(params.request, (
error?: Error,
data?: any,
cacheControl?: string,
expires?: string
) => {
this._dataUpdateable = isUpdateableGeoJSON(data, promoteId) ? toUpdateable(data, promoteId) : undefined;
callback(error, data, cacheControl, expires);
return new Promise<GeoJSON.GeoJSON>((resolve, reject) => {
const request = getJSON(params.request, (
error?: Error,
data?: any
) => {
this._dataUpdateable = isUpdateableGeoJSON(data, promoteId) ? toUpdateable(data, promoteId) : undefined;
if (error) {
reject(error);
return;
}
resolve(data);
});
abortController.signal.addEventListener('abort', () => {
request.cancel();
reject(new Error('AbortError'));
});
});
} else if (typeof params.data === 'string') {
}
if (typeof params.data === 'string') {
try {
const parsed = JSON.parse(params.data);
this._dataUpdateable = isUpdateableGeoJSON(parsed, promoteId) ? toUpdateable(parsed, promoteId) : undefined;
callback(null, parsed);
return parsed;
} catch (e) {
callback(new Error(`Input data given to '${params.source}' is not a valid GeoJSON object.`));
}
} else if (params.dataDiff) {
if (this._dataUpdateable) {
applySourceDiff(this._dataUpdateable, params.dataDiff, promoteId);
callback(null, {type: 'FeatureCollection', features: Array.from(this._dataUpdateable.values())});
} else {
callback(new Error(`Cannot update existing geojson data in ${params.source}`));
throw new Error(`Input data given to '${params.source}' is not a valid GeoJSON object.`);
}
} else {
callback(new Error(`Input data given to '${params.source}' is not a valid GeoJSON object.`));
}

return {cancel: () => {}};
if (!params.dataDiff) {
throw new Error(`Input data given to '${params.source}' is not a valid GeoJSON object.`);
}
if (!this._dataUpdateable) {
throw new Error(`Cannot update existing geojson data in ${params.source}`);
}
applySourceDiff(this._dataUpdateable, params.dataDiff, promoteId);
return {type: 'FeatureCollection', features: Array.from(this._dataUpdateable.values())};
};

async removeSource(_params: RemoveSourceParams): Promise<void> {
if (this._pendingPromise) {
// Don't leak callbacks
this._pendingPromise({abandoned: true});
if (this._pendingRequest) {
this._pendingRequest.abort();
}
}

Expand Down
1 change: 1 addition & 0 deletions src/source/vector_tile_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ export class VectorTileSource extends Evented implements Source {
return callback(err);
}
// HM TODO: add a unit test that gets here with error status 404
// HM TODO: add a unit test that gets here with data that is null - empty geojson
if (data && data.resourceTiming) {
tile.resourceTiming = data.resourceTiming;
}
Expand Down

0 comments on commit f5b4206

Please sign in to comment.