| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import * as stream from 'stream'
- import * as assert from 'assert'
- export interface StreamStageBuilder<T> {
- build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
- }
- export function MappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, mapFn: (t: T) => U) {
- return {
- build() {
- return [
- ...prev.build(),
- new stream.Transform({
- objectMode: true,
- transform(v, {}, callback) {
- try {
- callback(undefined, mapFn(v))
- } catch (e) {
- callback(e)
- }
- },
- }),
- ]
- },
- }
- }
- export function MappedAsyncStreamBuilder<T, U>(
- prev: StreamStageBuilder<T>,
- maxConcurrency: number,
- mapFn: (t: T) => Promise<U>,
- ) {
- let concurrency = 0
- return {
- build() {
- return [
- ...prev.build(),
- new stream.Transform({
- objectMode: true,
- transform(v, {}, callback) {
- concurrency++
- assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
- const promise = Promise.resolve(v).then(mapFn)
- this.push(promise)
- if (concurrency < maxConcurrency) {
- callback()
- }
- promise.finally(() => {
- if (concurrency === maxConcurrency) {
- concurrency--
- callback()
- } else {
- concurrency--
- }
- })
- },
- }),
- new stream.Transform({
- objectMode: true,
- writableHighWaterMark: maxConcurrency,
- transform(promise: Promise<U>, {}, callback) {
- promise.then(result => callback(undefined, result), callback)
- },
- }),
- ]
- },
- }
- }
- export function MappedAsyncUnorderedStreamBuilder<T, U>(
- prev: StreamStageBuilder<T>,
- maxConcurrency: number,
- mapFn: (t: T) => Promise<U>,
- ) {
- let concurrency = 0
- return {
- build() {
- const promises: Set<Promise<unknown>> = new Set()
- return [
- ...prev.build(),
- new stream.Transform({
- objectMode: true,
- transform(v, {}, onNext) {
- concurrency++
- assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
- const promise = Promise.resolve(v).then(mapFn)
- if (concurrency < maxConcurrency) {
- onNext()
- }
- promises.add(
- promise
- .then(result => this.push(result), onNext)
- .finally(() => {
- promises.delete(promise)
- if (concurrency === maxConcurrency) {
- concurrency--
- onNext()
- } else {
- concurrency--
- }
- }),
- )
- },
- flush(onEnd) {
- Promise.all(promises).then(() => onEnd(), onEnd)
- },
- }),
- ]
- },
- }
- }
- export function BatchedStreamBuilder<T>(prev: StreamStageBuilder<T>, batchSize: number) {
- let currentBatch: T[] = []
- return {
- build() {
- return [
- ...prev.build(),
- new stream.Transform({
- objectMode: true,
- transform(v, {}, callback) {
- currentBatch.push(v)
- if (currentBatch.length == batchSize) {
- callback(undefined, currentBatch)
- currentBatch = []
- } else {
- callback()
- }
- },
- flush(callback) {
- if (currentBatch.length > 0) {
- callback(undefined, currentBatch)
- } else {
- callback()
- }
- },
- }),
- ]
- },
- }
- }
- export function FilteredStreamBuilder<T>(prev: StreamStageBuilder<T>, predicate: (v: T) => boolean) {
- return {
- build() {
- return [
- ...prev.build(),
- new stream.Transform({
- objectMode: true,
- transform(v, {}, onNext) {
- try {
- if (predicate(v)) {
- this.push(v)
- }
- onNext()
- } catch (e) {
- onNext(e)
- }
- },
- }),
- ]
- },
- }
- }
- export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements: number, perDurationMs: number) {
- let timestamps: number[] = []
- return {
- build() {
- return [
- ...prev.build(),
- new stream.Transform({
- objectMode: true,
- transform(v, {}, onNext) {
- const current = Date.now()
- timestamps = [...timestamps.filter(t => t > current - perDurationMs), current]
- this.push(v)
- if (timestamps.length >= elements) {
- const timeToWait = timestamps[0] + perDurationMs - Date.now()
- setTimeout(onNext, timeToWait)
- } else {
- onNext()
- }
- },
- }),
- ]
- },
- }
- }
|