| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import {
- StreamStageBuilder,
- MappedStreamBuilder,
- MappedAsyncStreamBuilder,
- MappedAsyncUnorderedStreamBuilder,
- BatchedStreamBuilder,
- ThrottledStreamBuilder,
- FilteredStreamBuilder,
- LinearShape,
- } from './internal/StreamStageBuilder'
- import { Phantom } from './internal/common'
- import type { Sink } from './Sink'
- export class Flow<Input, Output> extends LinearShape<Output> {
- protected __phantom__!: { input: Input; output: Output }
- private constructor(protected builder: StreamStageBuilder<Output>) {
- super(builder)
- }
- static of<T>(): Flow<T, T> {
- return new Flow(EmptyFlow)
- }
- map<O>(f: (v: Output) => O): Flow<Input, O> {
- return new Flow(MappedStreamBuilder(this.builder, f))
- }
- mapAsync<O>(concurrency: number, f: (v: Output) => Promise<O>): Flow<Input, O> {
- return new Flow(MappedAsyncStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
- }
- mapAsyncUnordered<O>(concurrency: number, f: (v: Output) => Promise<O>): Flow<Input, O> {
- return new Flow(MappedAsyncUnorderedStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
- }
- grouped(n: number): Flow<Input, Output[]> {
- return new Flow(BatchedStreamBuilder(this.builder, n))
- }
- filter(predicate: (v: Output) => boolean): Flow<Input, Output> {
- return new Flow(FilteredStreamBuilder(this.builder, predicate))
- }
- throttle(elements: number, perDurationMs: number) {
- return new Flow(ThrottledStreamBuilder(this.builder, elements, perDurationMs))
- }
- into<M>(sink: Sink<Output, M>): Sink<Input, M> {
- const self = this
- return {
- __phantom__: Phantom(),
- builder: {
- buildStreams() {
- return [...self.builder.build(), ...(sink.builder.buildStreams?.() ?? [])]
- },
- buildWritable: sink.builder.buildWritable,
- },
- }
- }
- }
- const EmptyFlow: StreamStageBuilder<any> = {
- build() {
- return []
- },
- }
|