import { StreamStageBuilder, MappedStreamBuilder, MappedAsyncStreamBuilder, MappedAsyncUnorderedStreamBuilder, BatchedStreamBuilder, ThrottledStreamBuilder, FilteredStreamBuilder, LinearShape, } from './internal/StreamStageBuilder' import { Phantom } from './internal/common' import type { Sink } from './Sink' export class Flow extends LinearShape { protected __phantom__!: { input: Input; output: Output } private constructor(protected builder: StreamStageBuilder) { super(builder) } static of(): Flow { return new Flow(EmptyFlow) } map(f: (v: Output) => O): Flow { return new Flow(MappedStreamBuilder(this.builder, f)) } mapAsync(concurrency: number, f: (v: Output) => Promise): Flow { return new Flow(MappedAsyncStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f)) } mapAsyncUnordered(concurrency: number, f: (v: Output) => Promise): Flow { return new Flow(MappedAsyncUnorderedStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f)) } grouped(n: number): Flow { return new Flow(BatchedStreamBuilder(this.builder, n)) } filter(predicate: (v: Output) => boolean): Flow { return new Flow(FilteredStreamBuilder(this.builder, predicate)) } throttle(elements: number, perDurationMs: number) { return new Flow(ThrottledStreamBuilder(this.builder, elements, perDurationMs)) } into(sink: Sink): Sink { const self = this return { __phantom__: Phantom(), builder: { buildStreams() { return [...self.builder.build(), ...(sink.builder.buildStreams?.() ?? [])] }, buildWritable: sink.builder.buildWritable, }, } } } const EmptyFlow: StreamStageBuilder = { build() { return [] }, }