Aurélien Richez 4 سال پیش
والد
کامیت
67a1f1d5ac
7فایلهای تغییر یافته به همراه281 افزوده شده و 194 حذف شده
  1. 1 0
      package.json
  2. 69 0
      src/Flow.ts
  3. 1 0
      src/Sink.ts
  4. 20 193
      src/Source.ts
  5. 0 1
      src/filestream.test.ts
  6. 189 0
      src/internal/StreamStageBuilder.ts
  7. 1 0
      tsconfig.json

+ 1 - 0
package.json

@@ -3,6 +3,7 @@
   "version": "1.0.0",
   "description": "",
   "lib": "index/index.js",
+  "files": ["lib/*"],
   "scripts": {
     "test": "mocha -r ts-node/register src/*.test.ts",
     "test:watch": "mocha -r ts-node/register --watch --watch-files src/**/*.ts --reporter Min  src/*.test.ts",

+ 69 - 0
src/Flow.ts

@@ -0,0 +1,69 @@
+import {
+  StreamStageBuilder,
+  MappedStreamBuilder,
+  MappedAsyncStreamBuilder,
+  MappedAsyncUnorderedStreamBuilder,
+  BatchedStreamBuilder,
+  ThrottledStreamBuilder,
+  FilteredStreamBuilder,
+} from './internal/StreamStageBuilder'
+
+import { Phantom } from './common'
+import type { Sink } from './Sink'
+
+export class Flow<Input, Output> {
+  protected __phantom__!: { input: Input; output: Output }
+
+  private constructor(private builder: StreamStageBuilder<Output>) {}
+
+  static of<T>(): Flow<T, T> {
+    return new Flow(EmptyFlow)
+  }
+
+  map<O>(f: (v: Output) => O): Flow<Input, O> {
+    return new Flow(MappedStreamBuilder(this.builder, f))
+  }
+
+  mapAsync<O>(concurrency: number, f: (v: Output) => Promise<O>): Flow<Input, O> {
+    return new Flow(MappedAsyncStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
+  }
+
+  mapAsyncUnordered<O>(concurrency: number, f: (v: Output) => Promise<O>): Flow<Input, O> {
+    return new Flow(MappedAsyncUnorderedStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
+  }
+
+  grouped(n: number): Flow<Input, Output[]> {
+    return new Flow(BatchedStreamBuilder(this.builder, n))
+  }
+
+  filter(predicate: (v: Output) => boolean): Flow<Input, Output> {
+    return new Flow(FilteredStreamBuilder(this.builder, predicate))
+  }
+
+  throttle(elements: number, perDurationMs: number) {
+    return new Flow(ThrottledStreamBuilder(this.builder, elements, perDurationMs))
+  }
+
+  into<M>(sink: Sink<Output, M>): Sink<Input, M> {
+    const self = this
+    return {
+      __phantom__: Phantom(),
+      builder: {
+        buildStreams() {
+          return [...self.builder.build(), ...(sink.builder.buildStreams?.() ?? [])]
+        },
+        buildWritable: sink.builder.buildWritable,
+      },
+    }
+  }
+}
+
+interface FlowBuilder<Input, Output> {
+  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
+}
+
+const EmptyFlow: FlowBuilder<any, any> = {
+  build() {
+    return []
+  },
+}

+ 1 - 0
src/Sink.ts

@@ -11,6 +11,7 @@ export interface Sink<T, Mat> {
 }
 
 interface SinkBuilder<M> {
+  buildStreams?: () => Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
   buildWritable(out: (value: M) => void): stream.Writable
 }
 

+ 20 - 193
src/Source.ts

@@ -1,36 +1,45 @@
 import * as stream from 'stream'
 import * as NodeUtils from 'util'
-import * as assert from 'assert'
+
+import {
+  StreamStageBuilder,
+  MappedStreamBuilder,
+  MappedAsyncStreamBuilder,
+  MappedAsyncUnorderedStreamBuilder,
+  BatchedStreamBuilder,
+  ThrottledStreamBuilder,
+  FilteredStreamBuilder,
+} from './internal/StreamStageBuilder'
 
 import { Sink } from './Sink'
 
 export class Source<T> {
-  private __phantom__!: T
+  protected __phantom__!: T
 
-  private constructor(private builder: SourceBuilder<T>) {}
+  private constructor(private builder: StreamStageBuilder<T>) {}
 
   map<O>(f: (v: T) => O): Source<O> {
-    return new Source(MappedSourceBuilder(this.builder, f))
+    return new Source(MappedStreamBuilder(this.builder, f))
   }
 
   mapAsync<O>(concurrency: number, f: (v: T) => Promise<O>) {
-    return new Source(MappedAsyncSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
+    return new Source(MappedAsyncStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
   }
 
   mapAsyncUnordered<O>(concurrency: number, f: (v: T) => Promise<O>) {
-    return new Source(MappedAsyncUnorderedSourceBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
+    return new Source(MappedAsyncUnorderedStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
   }
 
   grouped(n: number): Source<T[]> {
-    return new Source(BatchedSourceBuilder(this.builder, n))
+    return new Source(BatchedStreamBuilder(this.builder, n))
   }
 
   filter(predicate: (v: T) => boolean): Source<T> {
-    return new Source(FilteredSourceBuilder(this.builder, predicate))
+    return new Source(FilteredStreamBuilder(this.builder, predicate))
   }
 
   throttle(elements: number, perDurationMs: number) {
-    return new Source(ThrottledSourceBuilder(this.builder, elements, perDurationMs))
+    return new Source(ThrottledStreamBuilder(this.builder, elements, perDurationMs))
   }
 
   into<M>(sink: Sink<T, M>) {
@@ -39,7 +48,8 @@ export class Source<T> {
       run() {
         let result: undefined | M
         const stages = [
-          ...self.builder.build(),
+          ...self.builder.build(), // FIXME sink internals leak here
+          ...(sink.builder.buildStreams ? sink.builder.buildStreams() : []),
           sink.builder.buildWritable((v: M) => {
             result = v
           }),
@@ -61,10 +71,6 @@ export class Source<T> {
   }
 }
 
-interface SourceBuilder<T> {
-  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
-}
-
 function IterableSourceBuilder<T>(it: Iterable<T>) {
   return {
     build() {
@@ -72,182 +78,3 @@ function IterableSourceBuilder<T>(it: Iterable<T>) {
     },
   }
 }
-
-function MappedSourceBuilder<T, U>(prev: SourceBuilder<T>, mapFn: (t: T) => U) {
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, {}, callback) {
-            try {
-              callback(undefined, mapFn(v))
-            } catch (e) {
-              callback(e)
-            }
-          },
-        }),
-      ]
-    },
-  }
-}
-
-function MappedAsyncSourceBuilder<T, U>(prev: SourceBuilder<T>, maxConcurrency: number, mapFn: (t: T) => Promise<U>) {
-  let concurrency = 0
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, {}, callback) {
-            concurrency++
-            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
-            const promise = Promise.resolve(v).then(mapFn)
-            this.push(promise)
-            if (concurrency < maxConcurrency) {
-              callback()
-            }
-            promise.finally(() => {
-              if (concurrency === maxConcurrency) {
-                concurrency--
-                callback()
-              } else {
-                concurrency--
-              }
-            })
-          },
-        }),
-        new stream.Transform({
-          objectMode: true,
-          writableHighWaterMark: maxConcurrency,
-          transform(promise: Promise<U>, {}, callback) {
-            promise.then(result => callback(undefined, result), callback)
-          },
-        }),
-      ]
-    },
-  }
-}
-
-function MappedAsyncUnorderedSourceBuilder<T, U>(
-  prev: SourceBuilder<T>,
-  maxConcurrency: number,
-  mapFn: (t: T) => Promise<U>,
-) {
-  let concurrency = 0
-  return {
-    build() {
-      const promises: Set<Promise<unknown>> = new Set()
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, {}, onNext) {
-            concurrency++
-            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
-            const promise = Promise.resolve(v).then(mapFn)
-            if (concurrency < maxConcurrency) {
-              onNext()
-            }
-            promises.add(
-              promise
-                .then(result => this.push(result), onNext)
-                .finally(() => {
-                  promises.delete(promise)
-                  if (concurrency === maxConcurrency) {
-                    concurrency--
-                    onNext()
-                  } else {
-                    concurrency--
-                  }
-                }),
-            )
-          },
-          flush(onEnd) {
-            Promise.all(promises).then(() => onEnd(), onEnd)
-          },
-        }),
-      ]
-    },
-  }
-}
-
-function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
-  let currentBatch: T[] = []
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, {}, callback) {
-            currentBatch.push(v)
-            if (currentBatch.length == batchSize) {
-              callback(undefined, currentBatch)
-              currentBatch = []
-            } else {
-              callback()
-            }
-          },
-          flush(callback) {
-            if(currentBatch.length > 0) {
-              callback(undefined, currentBatch)
-            } else {
-              callback()
-            }
-          },
-        }),
-      ]
-    },
-  }
-}
-
-function FilteredSourceBuilder<T>(prev: SourceBuilder<T>, predicate: (v: T) => boolean) {
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, {}, onNext) {
-            try {
-              if (predicate(v)) {
-                this.push(v)
-              }
-              onNext()
-            } catch (e) {
-              onNext(e)
-            }
-          },
-        }),
-      ]
-    },
-  }
-}
-
-function ThrottledSourceBuilder<T>(prev: SourceBuilder<T>, elements: number, perDurationMs: number) {
-  let timestamps: number[] = []
-  return {
-    build() {
-      return [
-        ...prev.build(),
-        new stream.Transform({
-          objectMode: true,
-          transform(v, {}, onNext) {
-            const current = Date.now()
-            timestamps = [...timestamps.filter(t => t > current - perDurationMs), current]
-            this.push(v)
-            if (timestamps.length >= elements) {
-              const timeToWait = timestamps[0] + perDurationMs - Date.now()
-              setTimeout(onNext, timeToWait)
-            } else {
-              onNext()
-            }
-          },
-        }),
-      ]
-    },
-  }
-}

+ 0 - 1
src/filestream.test.ts

@@ -1,5 +1,4 @@
 import * as assert from 'assert'
-import * as fc from 'fast-check'
 import { Source } from './Source'
 import * as Sink from './Sink'
 import * as fs from 'fs'

+ 189 - 0
src/internal/StreamStageBuilder.ts

@@ -0,0 +1,189 @@
+import * as stream from 'stream'
+import * as assert from 'assert'
+
+export interface StreamStageBuilder<T> {
+  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
+}
+
+export function MappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, mapFn: (t: T) => U) {
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, callback) {
+            try {
+              callback(undefined, mapFn(v))
+            } catch (e) {
+              callback(e)
+            }
+          },
+        }),
+      ]
+    },
+  }
+}
+
+export function MappedAsyncStreamBuilder<T, U>(
+  prev: StreamStageBuilder<T>,
+  maxConcurrency: number,
+  mapFn: (t: T) => Promise<U>,
+) {
+  let concurrency = 0
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, callback) {
+            concurrency++
+            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
+            const promise = Promise.resolve(v).then(mapFn)
+            this.push(promise)
+            if (concurrency < maxConcurrency) {
+              callback()
+            }
+            promise.finally(() => {
+              if (concurrency === maxConcurrency) {
+                concurrency--
+                callback()
+              } else {
+                concurrency--
+              }
+            })
+          },
+        }),
+        new stream.Transform({
+          objectMode: true,
+          writableHighWaterMark: maxConcurrency,
+          transform(promise: Promise<U>, {}, callback) {
+            promise.then(result => callback(undefined, result), callback)
+          },
+        }),
+      ]
+    },
+  }
+}
+
+export function MappedAsyncUnorderedStreamBuilder<T, U>(
+  prev: StreamStageBuilder<T>,
+  maxConcurrency: number,
+  mapFn: (t: T) => Promise<U>,
+) {
+  let concurrency = 0
+  return {
+    build() {
+      const promises: Set<Promise<unknown>> = new Set()
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, onNext) {
+            concurrency++
+            assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
+            const promise = Promise.resolve(v).then(mapFn)
+            if (concurrency < maxConcurrency) {
+              onNext()
+            }
+            promises.add(
+              promise
+                .then(result => this.push(result), onNext)
+                .finally(() => {
+                  promises.delete(promise)
+                  if (concurrency === maxConcurrency) {
+                    concurrency--
+                    onNext()
+                  } else {
+                    concurrency--
+                  }
+                }),
+            )
+          },
+          flush(onEnd) {
+            Promise.all(promises).then(() => onEnd(), onEnd)
+          },
+        }),
+      ]
+    },
+  }
+}
+
+export function BatchedStreamBuilder<T>(prev: StreamStageBuilder<T>, batchSize: number) {
+  let currentBatch: T[] = []
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, callback) {
+            currentBatch.push(v)
+            if (currentBatch.length == batchSize) {
+              callback(undefined, currentBatch)
+              currentBatch = []
+            } else {
+              callback()
+            }
+          },
+          flush(callback) {
+            if (currentBatch.length > 0) {
+              callback(undefined, currentBatch)
+            } else {
+              callback()
+            }
+          },
+        }),
+      ]
+    },
+  }
+}
+
+export function FilteredStreamBuilder<T>(prev: StreamStageBuilder<T>, predicate: (v: T) => boolean) {
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, onNext) {
+            try {
+              if (predicate(v)) {
+                this.push(v)
+              }
+              onNext()
+            } catch (e) {
+              onNext(e)
+            }
+          },
+        }),
+      ]
+    },
+  }
+}
+
+export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements: number, perDurationMs: number) {
+  let timestamps: number[] = []
+  return {
+    build() {
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, onNext) {
+            const current = Date.now()
+            timestamps = [...timestamps.filter(t => t > current - perDurationMs), current]
+            this.push(v)
+            if (timestamps.length >= elements) {
+              const timeToWait = timestamps[0] + perDurationMs - Date.now()
+              setTimeout(onNext, timeToWait)
+            } else {
+              onNext()
+            }
+          },
+        }),
+      ]
+    },
+  }
+}

+ 1 - 0
tsconfig.json

@@ -6,6 +6,7 @@
     "declaration": true,
     "moduleResolution": "node",
     "strict": true,
+    "noUnusedLocals": true,
     "esModuleInterop": true,
     "lib": ["es6"]
   },