Flow.ts 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import {
  2. StreamStageBuilder,
  3. MappedStreamBuilder,
  4. MappedAsyncStreamBuilder,
  5. MappedAsyncUnorderedStreamBuilder,
  6. BatchedStreamBuilder,
  7. ThrottledStreamBuilder,
  8. FilteredStreamBuilder,
  9. LinearShape,
  10. } from './internal/StreamStageBuilder'
  11. import { Phantom } from './internal/common'
  12. import type { Sink } from './Sink'
  13. export class Flow<Input, Output> extends LinearShape<Output> {
  14. protected __phantom__!: { input: Input; output: Output }
  15. private constructor(protected builder: StreamStageBuilder<Output>) {
  16. super(builder)
  17. }
  18. static of<T>(): Flow<T, T> {
  19. return new Flow(EmptyFlow)
  20. }
  21. map<O>(f: (v: Output) => O): Flow<Input, O> {
  22. return new Flow(MappedStreamBuilder(this.builder, f))
  23. }
  24. mapAsync<O>(concurrency: number, f: (v: Output) => Promise<O>): Flow<Input, O> {
  25. return new Flow(MappedAsyncStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
  26. }
  27. mapAsyncUnordered<O>(concurrency: number, f: (v: Output) => Promise<O>): Flow<Input, O> {
  28. return new Flow(MappedAsyncUnorderedStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
  29. }
  30. grouped(n: number): Flow<Input, Output[]> {
  31. return new Flow(BatchedStreamBuilder(this.builder, n))
  32. }
  33. filter(predicate: (v: Output) => boolean): Flow<Input, Output> {
  34. return new Flow(FilteredStreamBuilder(this.builder, predicate))
  35. }
  36. throttle(elements: number, perDurationMs: number) {
  37. return new Flow(ThrottledStreamBuilder(this.builder, elements, perDurationMs))
  38. }
  39. into<M>(sink: Sink<Output, M>): Sink<Input, M> {
  40. const self = this
  41. return {
  42. __phantom__: Phantom(),
  43. builder: {
  44. buildStreams() {
  45. return [...self.builder.build(), ...(sink.builder.buildStreams?.() ?? [])]
  46. },
  47. buildWritable: sink.builder.buildWritable,
  48. },
  49. }
  50. }
  51. }
  52. const EmptyFlow: StreamStageBuilder<any> = {
  53. build() {
  54. return []
  55. },
  56. }