浏览代码

Add filter

Aurélien Richez 5 年之前
父节点
当前提交
ebba18fd84
共有 3 个文件被更改,包括 105 次插入52 次删除
  1. 7 0
      .prettierrc
  2. 86 52
      src/Source.ts
  3. 12 0
      src/streams.test.ts

+ 7 - 0
.prettierrc

@@ -0,0 +1,7 @@
+{
+  "printWidth": 120,
+  "trailingComma": "all",
+  "semi": false,
+  "arrowParens": "avoid",
+  "singleQuote": true
+}

+ 86 - 52
src/Source.ts

@@ -1,13 +1,13 @@
-import * as stream from "stream";
-import * as NodeUtils from "util";
-import * as assert from "assert"
+import * as stream from 'stream'
+import * as NodeUtils from 'util'
+import * as assert from 'assert'
 
 import { Sink } from './Sink'
 
 export class Source<T> {
-  private __phanthom__!: T;
+  private __phanthom__!: T
 
-  private constructor(private builder: SourceBuilder<T>) { }
+  private constructor(private builder: SourceBuilder<T>) {}
 
   map<O>(f: (v: T) => O): Source<O> {
     return new Source(MappedSourceBuilder(this.builder, f))
@@ -22,43 +22,51 @@ export class Source<T> {
   }
 
   grouped(n: number): Source<T[]> {
-    return new Source(
-      BatchedSourceBuilder(this.builder, n)
-    );
+    return new Source(BatchedSourceBuilder(this.builder, n))
+  }
+
+  filter(predicate: (v: T) => boolean): Source<T> {
+    return new Source(FilteredSourceBuilder(this.builder, predicate))
   }
 
   into<M>(sink: Sink<T, M>) {
-    const self = this;
+    const self = this
     return {
       run() {
-        let result: undefined | M;
+        let result: undefined | M
         const stages = [
           ...self.builder.build(),
-          sink.builder.buildWritable((v: M) => { result = v; }),
-        ];
+          sink.builder.buildWritable((v: M) => {
+            result = v
+          }),
+        ]
         return NodeUtils.promisify(stream.pipeline)(stages).then(() => {
-          if (result !== undefined) return result;
-          else throw new Error("output function was not called "); // FIXME find a more error prone API ?
-        });
+          if (result !== undefined) return result
+          else throw new Error('output function was not called ') // FIXME find a more error prone API ?
+        })
       },
-    };
+    }
   }
 
   static fromArray<T>(a: T[]) {
-    return new Source<T>(IterableSourceBuilder(a));
+    return new Source<T>(IterableSourceBuilder(a))
+  }
+
+  static fromReadableBuilder<T>(build: () => stream.Readable) {
+    return new Source<T>({ build: () => [build()] })
   }
 }
 
 interface SourceBuilder<T> {
-  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>;
+  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
 }
 
 function IterableSourceBuilder<T>(it: Iterable<T>) {
   return {
     build() {
-      return [stream.Readable.from(it)];
+      return [stream.Readable.from(it)]
     },
-  };
+  }
 }
 
 function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
@@ -68,13 +76,17 @@ function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
         ...prev.build(),
         new stream.Transform({
           objectMode: true,
-          transform(v, { }, callback) {
-            callback(undefined, mapFn(v));
+          transform(v, {}, callback) {
+            try {
+              callback(undefined, mapFn(v))
+            } catch (e) {
+              callback(e)
+            }
           },
         }),
-      ];
+      ]
     },
-  };
+  }
 }
 
 function MappedAsyncSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
@@ -85,8 +97,8 @@ function MappedAsyncSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency:
         ...prev.build(),
         new stream.Transform({
           objectMode: true,
-          transform(v, { }, callback) {
-            concurrency++;
+          transform(v, {}, callback) {
+            concurrency++
             assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
             const promise = Promise.resolve(v).then(mapFn)
             this.push(promise)
@@ -106,21 +118,20 @@ function MappedAsyncSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency:
         new stream.Transform({
           objectMode: true,
           writableHighWaterMark: maxConcurrency,
-          transform(promise: Promise<U>, { }, callback) {
-            promise.then(
-              result => callback(undefined, result),
-              callback,
-            )
+          transform(promise: Promise<U>, {}, callback) {
+            promise.then(result => callback(undefined, result), callback)
           },
         }),
-      ];
+      ]
     },
-  };
+  }
 }
 
-
-
-function MappedAsyncUnorderedSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
+function MappedAsyncUnorderedSourceBuilder<T, U>(
+  prev: SourceBuilder<T>,
+  maxConcurrency: number,
+  mapFn: (t: T) => Promise<U>,
+) {
   let concurrency = 0
   return {
     build() {
@@ -129,8 +140,8 @@ function MappedAsyncUnorderedSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConc
         ...prev.build(),
         new stream.Transform({
           objectMode: true,
-          transform(v, { }, onNext) {
-            concurrency++;
+          transform(v, {}, onNext) {
+            concurrency++
             assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
             const promise = Promise.resolve(v).then(mapFn)
             if (concurrency < maxConcurrency) {
@@ -147,40 +158,63 @@ function MappedAsyncUnorderedSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConc
                   } else {
                     concurrency--
                   }
-                })
+                }),
             )
           },
           flush(onEnd) {
             Promise.all(promises).then(() => onEnd(), onEnd)
-          }
+          },
         }),
-      ];
+      ]
     },
-  };
+  }
 }
 
 function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
-  let currentBatch: T[] = [];
+  let currentBatch: T[] = []
   return {
     build() {
       return [
         ...prev.build(),
         new stream.Transform({
           objectMode: true,
-          transform(v, { }, callback) {
-            currentBatch.push(v);
+          transform(v, {}, callback) {
+            currentBatch.push(v)
             if (currentBatch.length == batchSize) {
-              callback(undefined, currentBatch);
-              currentBatch = [];
+              callback(undefined, currentBatch)
+              currentBatch = []
             } else {
-              callback();
+              callback()
             }
           },
           flush(callback) {
             callback(undefined, currentBatch)
-          }
+          },
         }),
-      ];
+      ]
     },
-  };
-}
+  }
+}
+
+function FilteredSourceBuilder<T>(prev: SourceBuilder<T>, predicate: (v: T) => boolean) {
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true, 
+          transform(v, {}, onNext) {
+            try {
+              if(predicate(v)) {
+                this.push(v)
+              }
+              onNext()
+            } catch (e) {
+              onNext(e)
+            }
+          }
+        })
+      ]
+    }
+  } 
+}

+ 12 - 0
src/streams.test.ts

@@ -84,4 +84,16 @@ describe('Source', function () {
     )
   })
 
+  it('should filter elements', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array(fc.integer()), async (arr) => {
+        const result = await Source.fromArray(arr)
+          .filter(n => n % 2 === 0)
+          .into(Sink.sum)
+          .run()
+        assert.deepEqual(result % 2, 0)
+      })
+    )
+  })
+
 });