import * as stream from 'stream' import * as assert from 'assert' export interface StreamStageBuilder { build(): Array } export function MappedStreamBuilder(prev: StreamStageBuilder, mapFn: (t: T) => U) { return { build() { return [ ...prev.build(), new stream.Transform({ objectMode: true, transform(v, {}, callback) { try { callback(undefined, mapFn(v)) } catch (e) { callback(e) } }, }), ] }, } } export function MappedAsyncStreamBuilder( prev: StreamStageBuilder, maxConcurrency: number, mapFn: (t: T) => Promise, ) { let concurrency = 0 return { build() { return [ ...prev.build(), new stream.Transform({ objectMode: true, transform(v, {}, callback) { concurrency++ assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`) const promise = Promise.resolve(v).then(mapFn) this.push(promise) if (concurrency < maxConcurrency) { callback() } promise.finally(() => { if (concurrency === maxConcurrency) { concurrency-- callback() } else { concurrency-- } }) }, }), new stream.Transform({ objectMode: true, writableHighWaterMark: maxConcurrency, transform(promise: Promise, {}, callback) { promise.then(result => callback(undefined, result), callback) }, }), ] }, } } export function MappedAsyncUnorderedStreamBuilder( prev: StreamStageBuilder, maxConcurrency: number, mapFn: (t: T) => Promise, ) { let concurrency = 0 return { build() { const promises: Set> = new Set() return [ ...prev.build(), new stream.Transform({ objectMode: true, transform(v, {}, onNext) { concurrency++ assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`) const promise = Promise.resolve(v).then(mapFn) if (concurrency < maxConcurrency) { onNext() } promises.add( promise .then(result => this.push(result), onNext) .finally(() => { promises.delete(promise) if (concurrency === maxConcurrency) { concurrency-- onNext() } else { concurrency-- } }), ) }, flush(onEnd) { Promise.all(promises).then(() => onEnd(), onEnd) }, }), ] }, } } export function BatchedStreamBuilder(prev: StreamStageBuilder, batchSize: number) { let currentBatch: T[] = [] return { build() { return [ ...prev.build(), new stream.Transform({ objectMode: true, transform(v, {}, callback) { currentBatch.push(v) if (currentBatch.length == batchSize) { callback(undefined, currentBatch) currentBatch = [] } else { callback() } }, flush(callback) { if (currentBatch.length > 0) { callback(undefined, currentBatch) } else { callback() } }, }), ] }, } } export function FilteredStreamBuilder(prev: StreamStageBuilder, predicate: (v: T) => boolean) { return { build() { return [ ...prev.build(), new stream.Transform({ objectMode: true, transform(v, {}, onNext) { try { if (predicate(v)) { this.push(v) } onNext() } catch (e) { onNext(e) } }, }), ] }, } } export function ThrottledStreamBuilder(prev: StreamStageBuilder, elements: number, perDurationMs: number) { let timestamps: number[] = [] return { build() { return [ ...prev.build(), new stream.Transform({ objectMode: true, transform(v, {}, onNext) { const current = Date.now() timestamps = [...timestamps.filter(t => t > current - perDurationMs), current] this.push(v) if (timestamps.length >= elements) { const timeToWait = timestamps[0] + perDurationMs - Date.now() setTimeout(onNext, timeToWait) } else { onNext() } }, }), ] }, } }