Aurélien Richez 5 лет назад
Родитель
Сommit
7533b30846
3 измененных файлов с 91 добавлено и 62 удалено
  1. 34 18
      src/Sink.ts
  2. 13 14
      src/Source.ts
  3. 44 30
      src/streams.test.ts

+ 34 - 18
src/Sink.ts

@@ -1,35 +1,34 @@
-import * as stream from "stream";
+import * as stream from 'stream'
 
 import { Phantom } from './common'
 
-export const Done = Symbol("Done");
-export type Done = typeof Done;
+export const Done = Symbol('Done')
+export type Done = typeof Done
 
 export interface Sink<T, Mat> {
-  builder: SinkBuilder<Mat>;
-  __phanthom__: T;
+  builder: SinkBuilder<Mat>
+  __phanthom__: T
 }
 
 interface SinkBuilder<M> {
-  buildWritable(out: (value: M) => void): stream.Writable;
+  buildWritable(out: (value: M) => void): stream.Writable
 }
 
-
 export const sum: Sink<number, number> = {
   builder: {
     buildWritable(out) {
-      let result = 0;
+      let result = 0
       return new stream.Writable({
         objectMode: true,
         write(v, _, onNext) {
-          result += v;
-          onNext();
+          result += v
+          onNext()
         },
-      }).on("finish", () => out(result));
+      }).on('finish', () => out(result))
     },
   },
   __phanthom__: Phantom(),
-};
+}
 
 export const ignore: Sink<any, Done> = {
   builder: {
@@ -37,13 +36,13 @@ export const ignore: Sink<any, Done> = {
       return new stream.Writable({
         objectMode: true,
         write(v, _, onNext) {
-          onNext();
+          onNext()
         },
-      }).on("finish", () => out(Done));
+      }).on('finish', () => out(Done))
     },
   },
   __phanthom__: Phantom(),
-};
+}
 
 export function reduce<V, Mat>(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Sink<V, Mat> {
   return {
@@ -54,11 +53,28 @@ export function reduce<V, Mat>(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Si
           objectMode: true,
           write(v, _, onNext) {
             acc = reduceFn(acc, v)
-            onNext();
+            onNext()
           },
-        }).on("finish", () => out(acc));
+        }).on('finish', () => out(acc))
       },
     },
     __phanthom__: Phantom(),
   }
-}
+}
+
+export function forEach<T>(fn: (t: T) => void): Sink<T, Done> {
+  return {
+    builder: {
+      buildWritable(out) {
+        return new stream.Writable({
+          objectMode: true,
+          write(v, _, onNext) {
+            fn(v)
+            onNext()
+          },
+        }).on('finish', () => out(Done))
+      },
+    },
+    __phanthom__: Phantom(),
+  }
+}

+ 13 - 14
src/Source.ts

@@ -46,7 +46,7 @@ export class Source<T> {
         ]
         return NodeUtils.promisify(stream.pipeline)(stages).then(() => {
           if (result !== undefined) return result
-          else throw new Error('output function was not called ') // FIXME find a more error prone API ?
+          else throw new Error('output function was not called ') // FIXME find a less error prone API ?
         })
       },
     }
@@ -206,24 +206,23 @@ function FilteredSourceBuilder<T>(prev: SourceBuilder<T>, predicate: (v: T) => b
       return [
         ...prev.build(),
         new stream.Transform({
-          objectMode: true, 
+          objectMode: true,
           transform(v, {}, onNext) {
             try {
-              if(predicate(v)) {
+              if (predicate(v)) {
                 this.push(v)
               }
               onNext()
             } catch (e) {
               onNext(e)
             }
-          }
-        })
+          },
+        }),
       ]
-    }
-  } 
+    },
+  }
 }
 
-
 function ThrottledSourceBuilder<T>(prev: SourceBuilder<T>, elements: number, perDurationMs: number) {
   let timestamps: number[] = []
   return {
@@ -234,17 +233,17 @@ function ThrottledSourceBuilder<T>(prev: SourceBuilder<T>, elements: number, per
           objectMode: true,
           transform(v, {}, onNext) {
             const current = Date.now()
-            timestamps = [...timestamps.filter(t => t > (current - perDurationMs)), current]
+            timestamps = [...timestamps.filter(t => t > current - perDurationMs), current]
             this.push(v)
-            if(timestamps.length >= elements) {
-              const  timeToWait = (timestamps[0] + perDurationMs) - Date.now()
+            if (timestamps.length >= elements) {
+              const timeToWait = timestamps[0] + perDurationMs - Date.now()
               setTimeout(onNext, timeToWait)
             } else {
               onNext()
             }
-          }
-        })
+          },
+        }),
       ]
-    }
+    },
   }
 }

+ 44 - 30
src/streams.test.ts

@@ -8,26 +8,25 @@ describe('Source', function () {
 
   it('should run a simple sum pipeline', async () => {
     await fc.assert(
-      fc.asyncProperty(fc.array<number>(fc.integer()), async (arr) => {
-        const result =
-          await Source.fromArray(arr)
-            .into(Sink.sum)
-            .run()
-        assert.equal(result, arr.reduce((a, b) => a + b, 0))
-      })
+      fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
+        const result = await Source.fromArray(arr).into(Sink.sum).run()
+        assert.equal(
+          result,
+          arr.reduce((a, b) => a + b, 0),
+        )
+      }),
     )
   })
 
   it('should run a simple sum with map', async () => {
     await fc.assert(
-      fc.asyncProperty(fc.array<number>(fc.integer()), async (arr) => {
-        const result =
-          await Source.fromArray<number>(arr)
-            .map(x => x * 2)
-            .into(Sink.sum)
-            .run()
+      fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
+        const result = await Source.fromArray<number>(arr)
+          .map(x => x * 2)
+          .into(Sink.sum)
+          .run()
         assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
-      })
+      }),
     )
   })
 
@@ -42,21 +41,23 @@ describe('Source', function () {
           })
           .into(Sink.sum)
           .run()
-        assert.equal(result, arr.length, "An element is missing")
-      })
+        assert.equal(result, arr.length, 'An element is missing')
+      }),
     )
   })
 
   it('should run ordered async', async () => {
     await fc.assert(
       fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
-        const timeout = () => (1 + (0 | (Math.random() * 4)))
+        const timeout = () => 1 + (0 | (Math.random() * 4))
         const result = await Source.fromArray(arr)
-          .mapAsync(concurrencyLevel, v => { return new Promise(resolve => setTimeout(() => resolve(v), timeout())) })
-          .into(Sink.reduce<number, number[]>((acc, b) => ([...acc, b]), []))
+          .mapAsync(concurrencyLevel, v => {
+            return new Promise(resolve => setTimeout(() => resolve(v), timeout()))
+          })
+          .into(Sink.reduce<number, number[]>((acc, b) => [...acc, b], []))
           .run()
         assert.deepEqual(result, arr)
-      })
+      }),
     )
   })
 
@@ -64,36 +65,49 @@ describe('Source', function () {
     await fc.assert(
       fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
         const result = await Source.fromArray(arr)
-          .mapAsyncUnordered(concurrencyLevel, v => { return new Promise(resolve => setTimeout(() => resolve(v * 2), 2)) })
+          .mapAsyncUnordered(concurrencyLevel, v => {
+            return new Promise(resolve => setTimeout(() => resolve(v * 2), 2))
+          })
           .into(Sink.sum)
           .run()
         assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
-      })
+      }),
     )
   })
 
   it('should filter elements', async () => {
     await fc.assert(
-      fc.asyncProperty(fc.array(fc.integer()), async (arr) => {
+      fc.asyncProperty(fc.array(fc.integer()), async arr => {
         const result = await Source.fromArray(arr)
           .filter(n => n % 2 === 0)
           .into(Sink.sum)
           .run()
         assert.deepEqual(result % 2, 0)
-      })
+      }),
     )
   })
 
   it('should throttle elements', async () => {
     const before = Date.now()
-    const result = await Source.fromArray([1,2,3,4,5])
-      .throttle(2, 10)
-      .into(Sink.sum)
-      .run()
+    const result = await Source.fromArray([1, 2, 3, 4, 5]).throttle(2, 10).into(Sink.sum).run()
     const after = Date.now()
     const duration = after - before
     assert.equal(result, 15)
-    assert.ok(duration> 20, 'must at least be 20ms but was ' + duration)
+    assert.ok(duration > 20, 'must at least be 20ms but was ' + duration)
   })
 
-});
+  it('should run forEach correctly', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array(fc.integer()), async arr => {
+        let acc = 0
+        await Source.fromArray(arr)
+          .into(Sink.forEach(n => (acc += n)))
+          .run()
+        assert.strictEqual(
+          acc,
+          arr.reduce((a, b) => a + b, 0),
+        )
+      }),
+    )
+  })
+})