|
|
@@ -39,9 +39,9 @@ export function MappedAsyncStreamBuilder<T, U>(
|
|
|
maxConcurrency: number,
|
|
|
mapFn: (t: T) => Promise<U>,
|
|
|
) {
|
|
|
- let concurrency = 0
|
|
|
return {
|
|
|
build() {
|
|
|
+ let concurrency = 0
|
|
|
return [
|
|
|
...prev.build(),
|
|
|
new stream.Transform({
|
|
|
@@ -81,9 +81,9 @@ export function MappedAsyncUnorderedStreamBuilder<T, U>(
|
|
|
maxConcurrency: number,
|
|
|
mapFn: (t: T) => Promise<U>,
|
|
|
) {
|
|
|
- let concurrency = 0
|
|
|
return {
|
|
|
build() {
|
|
|
+ let concurrency = 0
|
|
|
const promises: Set<Promise<unknown>> = new Set()
|
|
|
return [
|
|
|
...prev.build(),
|
|
|
@@ -120,9 +120,9 @@ export function MappedAsyncUnorderedStreamBuilder<T, U>(
|
|
|
}
|
|
|
|
|
|
export function BatchedStreamBuilder<T>(prev: StreamStageBuilder<T>, batchSize: number) {
|
|
|
- let currentBatch: T[] = []
|
|
|
return {
|
|
|
build() {
|
|
|
+ let currentBatch: T[] = []
|
|
|
return [
|
|
|
...prev.build(),
|
|
|
new stream.Transform({
|
|
|
@@ -173,9 +173,10 @@ export function FilteredStreamBuilder<T>(prev: StreamStageBuilder<T>, predicate:
|
|
|
}
|
|
|
|
|
|
export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements: number, perDurationMs: number) {
|
|
|
- let timestamps: number[] = []
|
|
|
return {
|
|
|
build() {
|
|
|
+ let timestamps: number[] = []
|
|
|
+
|
|
|
return [
|
|
|
...prev.build(),
|
|
|
new stream.Transform({
|