Browse Source

base streams working

Aurélien Richez 5 years ago
parent
commit
4470e4323a
3 changed files with 245 additions and 6 deletions
  1. 8 2
      package.json
  2. 190 4
      src/index.ts
  3. 47 0
      src/toto.test.ts

+ 8 - 2
package.json

@@ -4,7 +4,8 @@
   "description": "",
   "lib": "index/index.js",
   "scripts": {
-    "test": "echo \"Error: no test specified\" && exit 1",
+    "test": "mocha -r ts-node/register src/*.test.ts",
+    "test:watch": "mocha -r ts-node/register --watch --watch-files src/**/*.ts --reporter Min  src/*.test.ts",
     "build": "npm run clean && tsc && tsc -p tsconfig.json",
     "clean": "rimraf lib"
   },
@@ -12,7 +13,12 @@
   "license": "MPL-2.0",
   "dependencies": {},
   "devDependencies": {
+    "@types/mocha": "^7.0.2",
     "@types/node": "^12.12.38",
-    "rimraf": "^3.0.2"
+    "fast-check": "^1.24.2",
+    "mocha": "^7.1.2",
+    "rimraf": "^3.0.2",
+    "ts-node": "^8.10.1",
+    "typescript": "^3.9.2"
   }
 }

+ 190 - 4
src/index.ts

@@ -1,7 +1,193 @@
+import * as stream from "stream";
+import * as NodeUtils from "util";
 
-interface Source<T> {
-  map<V>(mapper: (t: T) => V): Source<V>
-  runWith<M>(sink: Sink<T, M>): Promise<T>
+export const Done = Symbol("Done");
+export type Done = typeof Done;
+
+interface Sink<T, Mat> {
+  builder: SinkBuilder<Mat>;
+  __phanthom__: T;
+}
+
+type RawSource<T> = IterableSource<T> | MappedSource<any, T>;
+
+interface MappedSource<T, V> {
+  _type: "MappedSource";
+  source: RawSource<T>;
+  mapFn: (t: T) => V;
+}
+
+interface IterableSource<T> {
+  _type: "IterableSource";
+  it: Iterable<T>;
+}
+
+type ClosedPipeline<T, M> = {
+  source: Source<T>;
+  sink: Sink<T, M>;
+};
+
+interface SinkBuilder<M> {
+  buildWritable(out: (value: M) => void): stream.Writable;
+}
+
+export namespace Sink {
+  export const sum: Sink<number, number> = {
+    builder: {
+      buildWritable(out) {
+        let result = 0;
+        return new stream.Writable({
+          objectMode: true,
+          write(v, _, onNext) {
+            result += v;
+            onNext();
+          },
+        }).on("finish", () => out(result));
+      },
+    },
+    __phanthom__: Phantom(),
+  };
+
+  export const ignore: Sink<any, Done> = {
+    builder: {
+      buildWritable(out) {
+        return new stream.Writable({
+          objectMode: true,
+          write(v, _, onNext) {
+            onNext();
+          },
+        }).on("finish", () => out(Done));
+      },
+    },
+    __phanthom__: Phantom(),
+  };
+}
+
+export class Source<T> {
+  private __phanthom__!: T;
+
+  private constructor(private builder: SourceBuilder<T>) {}
+
+  map<O>(f: (v: T) => O): Source<O> {
+    return new Source(MappedSourceBuilder(this.builder, f));
+  }
+
+  grouped(n: number): Source<T[]> {
+    return new Source(
+      BatchedSourceBuilder(this.builder, n)
+    );
+  }
+
+  into<M>(sink: Sink<T, M>) {
+    const self = this;
+    return {
+      run() {
+        let result: undefined | M;
+        const stages = [
+          ...self.builder.build(),
+          sink.builder.buildWritable((v) => {
+            result = v;
+          }),
+        ];
+        return NodeUtils.promisify(stream.pipeline)(stages).then(() => {
+          if (result !== undefined) return result;
+          else throw new Error("output function was not called "); // FIXME find a more error prone API ?
+        });
+      },
+    };
+  }
+
+  static fromArray<T>(a: T[]) {
+    return new Source<T>(IterableSourceBuilder(a));
+  }
 }
 
-interface Sink<T, M> {}
+interface SourceBuilder<T> {
+  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>;
+}
+
+function MappedSource<T, U>(
+  source: RawSource<T>,
+  mapFn: (t: T) => U
+): MappedSource<T, U> {
+  return { _type: "MappedSource", source, mapFn };
+}
+
+function IterableSource<T>(it: Iterable<T>): IterableSource<T> {
+  return { _type: "IterableSource", it };
+}
+
+function IterableSourceBuilder<T>(it: Iterable<T>) {
+  return {
+    build() {
+      return [stream.Readable.from(it)];
+    },
+  };
+}
+
+function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, callback) {
+            callback(undefined, mapFn(v));
+          },
+        }),
+      ];
+    },
+  };
+}
+
+function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
+  let currentBatch: T[] = [];
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, callback) {
+            currentBatch.push(v);
+            if (currentBatch.length == batchSize) {
+              callback(undefined, currentBatch);
+              currentBatch = [];
+            } else {
+              callback();
+            }
+          },
+          flush(callback) {
+            callback(undefined, currentBatch)
+          }
+        }),
+      ];
+    },
+  };
+}
+
+function buildStreams<T>(
+  r: RawSource<T>
+): Array<
+  NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream
+> {
+  switch (r._type) {
+    case "IterableSource":
+      return [stream.Readable.from(r.it)];
+    case "MappedSource":
+      return [
+        ...buildStreams(r.source),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, callback) {
+            callback(undefined, r.mapFn(v));
+          },
+        }),
+      ];
+  }
+}
+
+function Phantom<T>(): T {
+  return undefined!;
+}

+ 47 - 0
src/toto.test.ts

@@ -0,0 +1,47 @@
+import * as assert from 'assert'
+import * as fc from 'fast-check'
+import { Source, Sink } from './index'
+
+describe('Source', function() {
+
+    it('should run a simple sum pipeline', async () => {
+      await fc.assert(
+          fc.asyncProperty(fc.array<number>(fc.integer()), async  (arr) => {
+          const result = 
+            await Source.fromArray(arr)
+              .into(Sink.sum)
+              .run()
+              assert.equal(result, arr.reduce((a,b) => a + b, 0))
+        })
+      )
+    })
+
+    it('should run a simple sum with map', async () => {
+      await fc.assert(
+          fc.asyncProperty(fc.array<number>(fc.integer()), async  (arr) => {
+          const result = 
+            await Source.fromArray<number>(arr)
+              .map(x => x * 2)
+              .into(Sink.sum)
+              .run()
+          assert.equal(result, 2 * arr.reduce((a,b) => a + b, 0))
+        })
+      )
+    })
+
+    it('should run a grouped without missing any element', async () => {
+      await fc.assert(
+          fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async  (arr, groupSize) => {
+            const result = await Source.fromArray(arr)
+                .grouped(groupSize)
+                .map(v => { 
+                  assert.ok(v.length <= groupSize)
+                  return v.length
+                })
+                .into(Sink.sum)
+                .run()
+            assert.equal(result, arr.length, "An element is missing")
+        })
+      )
+    })
+});