浏览代码

add mapAsync

Aurélien Richez 5 年之前
父节点
当前提交
ec63f8888d
共有 3 个文件被更改,包括 199 次插入73 次删除
  1. 114 26
      src/index.ts
  2. 85 0
      src/streams.test.ts
  3. 0 47
      src/toto.test.ts

+ 114 - 26
src/index.ts

@@ -1,5 +1,6 @@
 import * as stream from "stream";
 import * as NodeUtils from "util";
+import * as assert from "assert"
 
 export const Done = Symbol("Done");
 export type Done = typeof Done;
@@ -61,15 +62,41 @@ export namespace Sink {
     },
     __phanthom__: Phantom(),
   };
+
+  export function reduce<V, Mat>(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Sink<V, Mat> {
+    return {
+      builder: {
+        buildWritable(out) {
+          let acc = zero
+          return new stream.Writable({
+            objectMode: true,
+            write(v, _, onNext) {
+              acc = reduceFn(acc, v)
+              onNext();
+            },
+          }).on("finish", () => out(acc));
+        },
+      },
+      __phanthom__: Phantom(),
+    }
+  }
 }
 
 export class Source<T> {
   private __phanthom__!: T;
 
-  private constructor(private builder: SourceBuilder<T>) {}
+  private constructor(private builder: SourceBuilder<T>) { }
 
   map<O>(f: (v: T) => O): Source<O> {
-    return new Source(MappedSourceBuilder(this.builder, f));
+    return new Source(MappedSourceBuilder(this.builder, f))
+  }
+
+  mapAsync<O>(concurrency: number, f: (v: T) => Promise<O>) {
+    return new Source(MappedAsyncSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
+  }
+
+  mapAsyncUnordered<O>(concurrency: number, f: (v: T) => Promise<O>) {
+    return new Source(MappedAsyncUnorderedSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
   }
 
   grouped(n: number): Source<T[]> {
@@ -132,7 +159,7 @@ function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
         ...prev.build(),
         new stream.Transform({
           objectMode: true,
-          transform(v, {}, callback) {
+          transform(v, { }, callback) {
             callback(undefined, mapFn(v));
           },
         }),
@@ -141,6 +168,88 @@ function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
   };
 }
 
+function MappedAsyncSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
+  let concurrency = 0
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, { }, callback) {
+            concurrency++;
+            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
+            const promise = Promise.resolve(v).then(mapFn)
+            this.push(promise)
+            if (concurrency < maxConcurrency) {
+              callback()
+            }
+            promise.finally(() => {
+              if (concurrency === maxConcurrency) {
+                concurrency--
+                callback()
+              } else {
+                concurrency--
+              }
+            })
+          },
+        }),
+        new stream.Transform({
+          objectMode: true,
+          writableHighWaterMark: maxConcurrency,
+          transform(promise: Promise<U>, { }, callback) {
+            promise.then(
+              result => callback(undefined, result),
+              callback,
+            )
+          },
+        }),
+      ];
+    },
+  };
+}
+
+
+
+function MappedAsyncUnorderedSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
+  let concurrency = 0
+  return {
+    build() {
+      const promises: Set<Promise<unknown>> = new Set()
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, { }, onNext) {
+            concurrency++;
+            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
+            const promise = Promise.resolve(v).then(mapFn)
+            if (concurrency < maxConcurrency) {
+              onNext()
+            }
+            promises.add(
+              promise
+                .then(result => this.push(result), onNext)
+                .finally(() => {
+                  promises.delete(promise)
+                  if (concurrency === maxConcurrency) {
+                    concurrency--
+                    onNext()
+                  } else {
+                    concurrency--
+                  }
+                })
+            )
+          },
+          flush(onEnd) {
+            Promise.all(promises).then(() => onEnd(), onEnd)
+          }
+        }),
+      ];
+    },
+  };
+}
+
 function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
   let currentBatch: T[] = [];
   return {
@@ -149,7 +258,7 @@ function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
         ...prev.build(),
         new stream.Transform({
           objectMode: true,
-          transform(v, {}, callback) {
+          transform(v, { }, callback) {
             currentBatch.push(v);
             if (currentBatch.length == batchSize) {
               callback(undefined, currentBatch);
@@ -167,27 +276,6 @@ function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
   };
 }
 
-function buildStreams<T>(
-  r: RawSource<T>
-): Array<
-  NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream
-> {
-  switch (r._type) {
-    case "IterableSource":
-      return [stream.Readable.from(r.it)];
-    case "MappedSource":
-      return [
-        ...buildStreams(r.source),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, {}, callback) {
-            callback(undefined, r.mapFn(v));
-          },
-        }),
-      ];
-  }
-}
-
 function Phantom<T>(): T {
-  return undefined!;
+  return undefined!
 }

+ 85 - 0
src/streams.test.ts

@@ -0,0 +1,85 @@
+import * as assert from 'assert'
+import * as fc from 'fast-check'
+import { Source, Sink } from './index'
+
+describe('Source', function () {
+  this.timeout(5000)
+
+  it('should run a simple sum pipeline', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array<number>(fc.integer()), async (arr) => {
+        const result =
+          await Source.fromArray(arr)
+            .into(Sink.sum)
+            .run()
+        assert.equal(result, arr.reduce((a, b) => a + b, 0))
+      })
+    )
+  })
+
+  it('should run a simple sum with map', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array<number>(fc.integer()), async (arr) => {
+        const result =
+          await Source.fromArray<number>(arr)
+            .map(x => x * 2)
+            .into(Sink.sum)
+            .run()
+        assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
+      })
+    )
+  })
+
+  it('should run a grouped without missing any element', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
+        const result = await Source.fromArray(arr)
+          .grouped(groupSize)
+          .map(v => {
+            assert.ok(v.length <= groupSize)
+            return v.length
+          })
+          .into(Sink.sum)
+          .run()
+        assert.equal(result, arr.length, "An element is missing")
+      })
+    )
+  })
+
+  it('should run async computations', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array(fc.integer()), async (arr) => {
+        const result = await Source.fromArray(arr)
+          .mapAsync(50, v => { return new Promise(resolve => setTimeout(() => resolve(v * 2), 2)) })
+          .into(Sink.sum)
+          .run()
+        assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
+      })
+    )
+  })
+
+  it('should run unordered async computations', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
+        const result = await Source.fromArray(arr)
+          .mapAsyncUnordered(concurrencyLevel, v => { return new Promise(resolve => setTimeout(() => resolve(v * 2), 2)) })
+          .into(Sink.sum)
+          .run()
+        assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
+      })
+    )
+  })
+
+  it('should run async computations while keeping order', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
+        const timeout = () => (1 + (0 | (Math.random() * 4)))
+        const result = await Source.fromArray(arr)
+          .mapAsync(concurrencyLevel, v => { return new Promise(resolve => setTimeout(() => resolve(v), timeout())) })
+          .into(Sink.reduce<number, number[]>((acc, b) => ([...acc, b]), []))
+          .run()
+        assert.deepEqual(result, arr)
+      })
+    )
+  })
+});

+ 0 - 47
src/toto.test.ts

@@ -1,47 +0,0 @@
-import * as assert from 'assert'
-import * as fc from 'fast-check'
-import { Source, Sink } from './index'
-
-describe('Source', function() {
-
-    it('should run a simple sum pipeline', async () => {
-      await fc.assert(
-          fc.asyncProperty(fc.array<number>(fc.integer()), async  (arr) => {
-          const result = 
-            await Source.fromArray(arr)
-              .into(Sink.sum)
-              .run()
-              assert.equal(result, arr.reduce((a,b) => a + b, 0))
-        })
-      )
-    })
-
-    it('should run a simple sum with map', async () => {
-      await fc.assert(
-          fc.asyncProperty(fc.array<number>(fc.integer()), async  (arr) => {
-          const result = 
-            await Source.fromArray<number>(arr)
-              .map(x => x * 2)
-              .into(Sink.sum)
-              .run()
-          assert.equal(result, 2 * arr.reduce((a,b) => a + b, 0))
-        })
-      )
-    })
-
-    it('should run a grouped without missing any element', async () => {
-      await fc.assert(
-          fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async  (arr, groupSize) => {
-            const result = await Source.fromArray(arr)
-                .grouped(groupSize)
-                .map(v => { 
-                  assert.ok(v.length <= groupSize)
-                  return v.length
-                })
-                .into(Sink.sum)
-                .run()
-            assert.equal(result, arr.length, "An element is missing")
-        })
-      )
-    })
-});