Aurélien Richez 5 vuotta sitten
vanhempi
commit
bf835b9eed
5 muutettua tiedostoa jossa 256 lisäystä ja 282 poistoa
  1. 64 0
      src/Sink.ts
  2. 186 0
      src/Source.ts
  3. 3 0
      src/common.ts
  4. 0 281
      src/index.ts
  5. 3 1
      src/streams.test.ts

+ 64 - 0
src/Sink.ts

@@ -0,0 +1,64 @@
+import * as stream from "stream";
+
+import { Phantom } from './common'
+
+export const Done = Symbol("Done");
+export type Done = typeof Done;
+
+export interface Sink<T, Mat> {
+  builder: SinkBuilder<Mat>;
+  __phanthom__: T;
+}
+
+interface SinkBuilder<M> {
+  buildWritable(out: (value: M) => void): stream.Writable;
+}
+
+
+export const sum: Sink<number, number> = {
+  builder: {
+    buildWritable(out) {
+      let result = 0;
+      return new stream.Writable({
+        objectMode: true,
+        write(v, _, onNext) {
+          result += v;
+          onNext();
+        },
+      }).on("finish", () => out(result));
+    },
+  },
+  __phanthom__: Phantom(),
+};
+
+export const ignore: Sink<any, Done> = {
+  builder: {
+    buildWritable(out) {
+      return new stream.Writable({
+        objectMode: true,
+        write(v, _, onNext) {
+          onNext();
+        },
+      }).on("finish", () => out(Done));
+    },
+  },
+  __phanthom__: Phantom(),
+};
+
+export function reduce<V, Mat>(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Sink<V, Mat> {
+  return {
+    builder: {
+      buildWritable(out) {
+        let acc = zero
+        return new stream.Writable({
+          objectMode: true,
+          write(v, _, onNext) {
+            acc = reduceFn(acc, v)
+            onNext();
+          },
+        }).on("finish", () => out(acc));
+      },
+    },
+    __phanthom__: Phantom(),
+  }
+}

+ 186 - 0
src/Source.ts

@@ -0,0 +1,186 @@
+import * as stream from "stream";
+import * as NodeUtils from "util";
+import * as assert from "assert"
+
+import { Sink } from './Sink'
+
+export class Source<T> {
+  private __phanthom__!: T;
+
+  private constructor(private builder: SourceBuilder<T>) { }
+
+  map<O>(f: (v: T) => O): Source<O> {
+    return new Source(MappedSourceBuilder(this.builder, f))
+  }
+
+  mapAsync<O>(concurrency: number, f: (v: T) => Promise<O>) {
+    return new Source(MappedAsyncSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
+  }
+
+  mapAsyncUnordered<O>(concurrency: number, f: (v: T) => Promise<O>) {
+    return new Source(MappedAsyncUnorderedSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
+  }
+
+  grouped(n: number): Source<T[]> {
+    return new Source(
+      BatchedSourceBuilder(this.builder, n)
+    );
+  }
+
+  into<M>(sink: Sink<T, M>) {
+    const self = this;
+    return {
+      run() {
+        let result: undefined | M;
+        const stages = [
+          ...self.builder.build(),
+          sink.builder.buildWritable((v: M) => { result = v; }),
+        ];
+        return NodeUtils.promisify(stream.pipeline)(stages).then(() => {
+          if (result !== undefined) return result;
+          else throw new Error("output function was not called "); // FIXME find a more error prone API ?
+        });
+      },
+    };
+  }
+
+  static fromArray<T>(a: T[]) {
+    return new Source<T>(IterableSourceBuilder(a));
+  }
+}
+
+interface SourceBuilder<T> {
+  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>;
+}
+
+function IterableSourceBuilder<T>(it: Iterable<T>) {
+  return {
+    build() {
+      return [stream.Readable.from(it)];
+    },
+  };
+}
+
+function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, { }, callback) {
+            callback(undefined, mapFn(v));
+          },
+        }),
+      ];
+    },
+  };
+}
+
+function MappedAsyncSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
+  let concurrency = 0
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, { }, callback) {
+            concurrency++;
+            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
+            const promise = Promise.resolve(v).then(mapFn)
+            this.push(promise)
+            if (concurrency < maxConcurrency) {
+              callback()
+            }
+            promise.finally(() => {
+              if (concurrency === maxConcurrency) {
+                concurrency--
+                callback()
+              } else {
+                concurrency--
+              }
+            })
+          },
+        }),
+        new stream.Transform({
+          objectMode: true,
+          writableHighWaterMark: maxConcurrency,
+          transform(promise: Promise<U>, { }, callback) {
+            promise.then(
+              result => callback(undefined, result),
+              callback,
+            )
+          },
+        }),
+      ];
+    },
+  };
+}
+
+
+
+function MappedAsyncUnorderedSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
+  let concurrency = 0
+  return {
+    build() {
+      const promises: Set<Promise<unknown>> = new Set()
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, { }, onNext) {
+            concurrency++;
+            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
+            const promise = Promise.resolve(v).then(mapFn)
+            if (concurrency < maxConcurrency) {
+              onNext()
+            }
+            promises.add(
+              promise
+                .then(result => this.push(result), onNext)
+                .finally(() => {
+                  promises.delete(promise)
+                  if (concurrency === maxConcurrency) {
+                    concurrency--
+                    onNext()
+                  } else {
+                    concurrency--
+                  }
+                })
+            )
+          },
+          flush(onEnd) {
+            Promise.all(promises).then(() => onEnd(), onEnd)
+          }
+        }),
+      ];
+    },
+  };
+}
+
+function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
+  let currentBatch: T[] = [];
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, { }, callback) {
+            currentBatch.push(v);
+            if (currentBatch.length == batchSize) {
+              callback(undefined, currentBatch);
+              currentBatch = [];
+            } else {
+              callback();
+            }
+          },
+          flush(callback) {
+            callback(undefined, currentBatch)
+          }
+        }),
+      ];
+    },
+  };
+}

+ 3 - 0
src/common.ts

@@ -0,0 +1,3 @@
+export function Phantom<T>(): T {
+  return undefined!
+}

+ 0 - 281
src/index.ts

@@ -1,281 +0,0 @@
-import * as stream from "stream";
-import * as NodeUtils from "util";
-import * as assert from "assert"
-
-export const Done = Symbol("Done");
-export type Done = typeof Done;
-
-interface Sink<T, Mat> {
-  builder: SinkBuilder<Mat>;
-  __phanthom__: T;
-}
-
-type RawSource<T> = IterableSource<T> | MappedSource<any, T>;
-
-interface MappedSource<T, V> {
-  _type: "MappedSource";
-  source: RawSource<T>;
-  mapFn: (t: T) => V;
-}
-
-interface IterableSource<T> {
-  _type: "IterableSource";
-  it: Iterable<T>;
-}
-
-type ClosedPipeline<T, M> = {
-  source: Source<T>;
-  sink: Sink<T, M>;
-};
-
-interface SinkBuilder<M> {
-  buildWritable(out: (value: M) => void): stream.Writable;
-}
-
-export namespace Sink {
-  export const sum: Sink<number, number> = {
-    builder: {
-      buildWritable(out) {
-        let result = 0;
-        return new stream.Writable({
-          objectMode: true,
-          write(v, _, onNext) {
-            result += v;
-            onNext();
-          },
-        }).on("finish", () => out(result));
-      },
-    },
-    __phanthom__: Phantom(),
-  };
-
-  export const ignore: Sink<any, Done> = {
-    builder: {
-      buildWritable(out) {
-        return new stream.Writable({
-          objectMode: true,
-          write(v, _, onNext) {
-            onNext();
-          },
-        }).on("finish", () => out(Done));
-      },
-    },
-    __phanthom__: Phantom(),
-  };
-
-  export function reduce<V, Mat>(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Sink<V, Mat> {
-    return {
-      builder: {
-        buildWritable(out) {
-          let acc = zero
-          return new stream.Writable({
-            objectMode: true,
-            write(v, _, onNext) {
-              acc = reduceFn(acc, v)
-              onNext();
-            },
-          }).on("finish", () => out(acc));
-        },
-      },
-      __phanthom__: Phantom(),
-    }
-  }
-}
-
-export class Source<T> {
-  private __phanthom__!: T;
-
-  private constructor(private builder: SourceBuilder<T>) { }
-
-  map<O>(f: (v: T) => O): Source<O> {
-    return new Source(MappedSourceBuilder(this.builder, f))
-  }
-
-  mapAsync<O>(concurrency: number, f: (v: T) => Promise<O>) {
-    return new Source(MappedAsyncSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
-  }
-
-  mapAsyncUnordered<O>(concurrency: number, f: (v: T) => Promise<O>) {
-    return new Source(MappedAsyncUnorderedSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
-  }
-
-  grouped(n: number): Source<T[]> {
-    return new Source(
-      BatchedSourceBuilder(this.builder, n)
-    );
-  }
-
-  into<M>(sink: Sink<T, M>) {
-    const self = this;
-    return {
-      run() {
-        let result: undefined | M;
-        const stages = [
-          ...self.builder.build(),
-          sink.builder.buildWritable((v) => {
-            result = v;
-          }),
-        ];
-        return NodeUtils.promisify(stream.pipeline)(stages).then(() => {
-          if (result !== undefined) return result;
-          else throw new Error("output function was not called "); // FIXME find a more error prone API ?
-        });
-      },
-    };
-  }
-
-  static fromArray<T>(a: T[]) {
-    return new Source<T>(IterableSourceBuilder(a));
-  }
-}
-
-interface SourceBuilder<T> {
-  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>;
-}
-
-function MappedSource<T, U>(
-  source: RawSource<T>,
-  mapFn: (t: T) => U
-): MappedSource<T, U> {
-  return { _type: "MappedSource", source, mapFn };
-}
-
-function IterableSource<T>(it: Iterable<T>): IterableSource<T> {
-  return { _type: "IterableSource", it };
-}
-
-function IterableSourceBuilder<T>(it: Iterable<T>) {
-  return {
-    build() {
-      return [stream.Readable.from(it)];
-    },
-  };
-}
-
-function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, { }, callback) {
-            callback(undefined, mapFn(v));
-          },
-        }),
-      ];
-    },
-  };
-}
-
-function MappedAsyncSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
-  let concurrency = 0
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, { }, callback) {
-            concurrency++;
-            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
-            const promise = Promise.resolve(v).then(mapFn)
-            this.push(promise)
-            if (concurrency < maxConcurrency) {
-              callback()
-            }
-            promise.finally(() => {
-              if (concurrency === maxConcurrency) {
-                concurrency--
-                callback()
-              } else {
-                concurrency--
-              }
-            })
-          },
-        }),
-        new stream.Transform({
-          objectMode: true,
-          writableHighWaterMark: maxConcurrency,
-          transform(promise: Promise<U>, { }, callback) {
-            promise.then(
-              result => callback(undefined, result),
-              callback,
-            )
-          },
-        }),
-      ];
-    },
-  };
-}
-
-
-
-function MappedAsyncUnorderedSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
-  let concurrency = 0
-  return {
-    build() {
-      const promises: Set<Promise<unknown>> = new Set()
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, { }, onNext) {
-            concurrency++;
-            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
-            const promise = Promise.resolve(v).then(mapFn)
-            if (concurrency < maxConcurrency) {
-              onNext()
-            }
-            promises.add(
-              promise
-                .then(result => this.push(result), onNext)
-                .finally(() => {
-                  promises.delete(promise)
-                  if (concurrency === maxConcurrency) {
-                    concurrency--
-                    onNext()
-                  } else {
-                    concurrency--
-                  }
-                })
-            )
-          },
-          flush(onEnd) {
-            Promise.all(promises).then(() => onEnd(), onEnd)
-          }
-        }),
-      ];
-    },
-  };
-}
-
-function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
-  let currentBatch: T[] = [];
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, { }, callback) {
-            currentBatch.push(v);
-            if (currentBatch.length == batchSize) {
-              callback(undefined, currentBatch);
-              currentBatch = [];
-            } else {
-              callback();
-            }
-          },
-          flush(callback) {
-            callback(undefined, currentBatch)
-          }
-        }),
-      ];
-    },
-  };
-}
-
-function Phantom<T>(): T {
-  return undefined!
-}

+ 3 - 1
src/streams.test.ts

@@ -1,6 +1,7 @@
 import * as assert from 'assert'
 import * as fc from 'fast-check'
-import { Source, Sink } from './index'
+import { Source } from './Source'
+import * as Sink from './Sink'
 
 describe('Source', function () {
   this.timeout(5000)
@@ -82,4 +83,5 @@ describe('Source', function () {
       })
     )
   })
+
 });