Aurélien Richez 4 년 전
부모
커밋
a2bb6443b1
4개의 변경된 파일55개의 추가작업 그리고 5개의 파일을 삭제
  1. 5 2
      src/Flow.ts
  2. 17 3
      src/Source.ts
  3. 9 0
      src/internal/StreamStageBuilder.ts
  4. 24 0
      test/stream.test.ts

+ 5 - 2
src/Flow.ts

@@ -6,15 +6,18 @@ import {
   BatchedStreamBuilder,
   ThrottledStreamBuilder,
   FilteredStreamBuilder,
+  LinearShape,
 } from './internal/StreamStageBuilder'
 
 import { Phantom } from './internal/common'
 import type { Sink } from './Sink'
 
-export class Flow<Input, Output> {
+export class Flow<Input, Output> extends LinearShape<Output> {
   protected __phantom__!: { input: Input; output: Output }
 
-  private constructor(private builder: StreamStageBuilder<Output>) {}
+  private constructor(protected builder: StreamStageBuilder<Output>) {
+    super(builder)
+  }
 
   static of<T>(): Flow<T, T> {
     return new Flow(EmptyFlow)

+ 17 - 3
src/Source.ts

@@ -9,14 +9,19 @@ import {
   BatchedStreamBuilder,
   ThrottledStreamBuilder,
   FilteredStreamBuilder,
+  LinearShape,
+  builderSymbol,
 } from './internal/StreamStageBuilder'
 
 import { Sink } from './Sink'
+import { Flow } from './Flow'
 
-export class Source<T> {
+export class Source<T> extends LinearShape<T> {
   protected __phantom__!: T
 
-  private constructor(private builder: StreamStageBuilder<T>) {}
+  private constructor(protected builder: StreamStageBuilder<T>) {
+    super(builder)
+  }
 
   map<O>(f: (v: T) => O): Source<O> {
     return new Source(MappedStreamBuilder(this.builder, f))
@@ -38,10 +43,19 @@ export class Source<T> {
     return new Source(FilteredStreamBuilder(this.builder, predicate))
   }
 
-  throttle(elements: number, perDurationMs: number) {
+  throttle(elements: number, perDurationMs: number): Source<T> {
     return new Source(ThrottledStreamBuilder(this.builder, elements, perDurationMs))
   }
 
+  via<O>(flow: Flow<T, O>): Source<O> {
+    const builder = this.builder
+    return new Source({
+      build() {
+        return [...builder.build(), ...flow[builderSymbol].build()]
+      },
+    })
+  }
+
   into<M>(sink: Sink<T, M>) {
     const self = this
     return {

+ 9 - 0
src/internal/StreamStageBuilder.ts

@@ -1,6 +1,15 @@
 import * as stream from 'stream'
 import * as assert from 'assert'
 
+export const builderSymbol = Symbol('builder')
+export abstract class LinearShape<T> {
+  [builderSymbol]: StreamStageBuilder<T>
+
+  protected constructor(_builder: StreamStageBuilder<T>) {
+    this[builderSymbol] = _builder
+  }
+}
+
 export interface StreamStageBuilder<T> {
   build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
 }

+ 24 - 0
test/streams.test.ts → test/stream.test.ts

@@ -2,6 +2,7 @@ import * as assert from 'assert'
 import * as fc from 'fast-check'
 import { Source } from '../src/Source'
 import * as Sink from '../src/Sink'
+import { Flow } from '../src/Flow'
 
 describe('Source', function () {
   this.timeout(5000)
@@ -125,4 +126,27 @@ describe('Source', function () {
       }),
     )
   })
+
+  it('should run a flow correctly into a sink', async () => {
+    await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
+      const result = await Source.fromArray<number>(arr)
+        .into(
+          Flow.of<number>()
+            .map(v => v * 2)
+            .into(Sink.sum),
+        )
+        .run()
+      assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
+    })
+  })
+
+  it('should run a flow with `via`', async () => {
+    await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
+      const result = await Source.fromArray<number>(arr)
+        .via(Flow.of<number>().map(v => v * 2))
+        .into(Sink.sum)
+        .run()
+      assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
+    })
+  })
 })