Aurélien Richez 4 жил өмнө
parent
commit
c1ba8a6c9f

+ 1 - 2
package.json

@@ -9,8 +9,7 @@
   "scripts": {
     "test": "mocha -r ts-node/register test/*.test.ts",
     "test:watch": "mocha -r ts-node/register --watch-extensions ts --watch --watch-files src/**/*.ts --watch-files test/**/*.ts --reporter Min  test/*.test.ts",
-    "prebuild": "npm run clean",
-    "build": "tsc -p tsconfig.json",
+    "build": "yarn install && tsc -p tsconfig.json",
     "build:watch": "tsc -p tsconfig.json --watch",
     "clean": "rimraf lib"
   },

+ 12 - 3
src/Source.ts

@@ -12,6 +12,7 @@ import {
   LinearShape,
   builderSymbol,
   FlatMappedStreamBuilder,
+  SizeLimitedStreamBuilder,
 } from './internal/StreamStageBuilder'
 
 import { Sink } from './Sink'
@@ -48,6 +49,10 @@ export class Source<T> extends LinearShape<T> {
     return new Source(ThrottledStreamBuilder(this.builder, elements, perDurationMs))
   }
 
+  take(elements: number): Source<T> {
+    return new Source(SizeLimitedStreamBuilder(this.builder, elements))
+  }
+
   via<O>(flow: Flow<T, O>): Source<O> {
     const builder = this.builder
     return new Source({
@@ -82,7 +87,11 @@ export class Source<T> extends LinearShape<T> {
   }
 
   static fromArray<T>(a: T[]) {
-    return new Source<T>(IterableSourceBuilder(a))
+    return new Source<T>(IterableSourceBuilder(() => a))
+  }
+
+  static fromIterable<T>(iterableBuilder: () => Iterable<T> | AsyncIterable<T>) {
+    return new Source<T>(IterableSourceBuilder(iterableBuilder))
   }
 
   static fromReadableBuilder<T>(build: () => stream.Readable) {
@@ -90,10 +99,10 @@ export class Source<T> extends LinearShape<T> {
   }
 }
 
-function IterableSourceBuilder<T>(it: Iterable<T>) {
+function IterableSourceBuilder<T>(iterableBuilder: () => Iterable<T> | AsyncIterable<T>) {
   return {
     build() {
-      return [stream.Readable.from(it, { autoDestroy: true })]
+      return [stream.Readable.from(iterableBuilder(), { autoDestroy: true })]
     },
   }
 }

+ 24 - 0
src/internal/StreamStageBuilder.ts

@@ -198,6 +198,30 @@ export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements:
   }
 }
 
+export function SizeLimitedStreamBuilder<T>(prev: StreamStageBuilder<T>, maxElems: number) {
+  return {
+    build() {
+      let count: number = 0
+
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, onNext) {
+            count++
+            this.push(v)
+            if (count >= maxElems) {
+              onNext(undefined, null)
+            } else {
+              onNext()
+            }
+          },
+        }),
+      ]
+    },
+  }
+}
+
 export function FlatMappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, flatMapFn: (v: T) => StreamStageBuilder<U>) {
   return {
     build() {

+ 14 - 0
test/stream.test.ts

@@ -175,4 +175,18 @@ describe('Source', function () {
       .run()
     assert.strictEqual(result, 'aabbddeeff')
   })
+
+  it('should limit the size of a stream with take', async () => {
+    function* infinite() {
+      while (true) {
+        yield 1
+      }
+    }
+
+    await fc.asyncProperty(fc.integer(), async num => {
+      const result = await Source.fromIterable(infinite).take(num).into(Sink.sum).run()
+
+      assert.strictEqual(result, num)
+    })
+  })
 })