Aurélien Richez 4 жил өмнө
parent
commit
4b7cb5b50d

+ 2 - 2
package.json

@@ -8,7 +8,7 @@
   ],
   "scripts": {
     "test": "mocha -r ts-node/register test/*.test.ts",
-    "test:watch": "mocha -r ts-node/register --watch --watch-files src/**/*.ts --reporter Min  test/*.test.ts",
+    "test:watch": "mocha -r ts-node/register --watch-extensions ts --watch --watch-files src/**/*.ts --watch-files test/**/*.ts --reporter Min  test/*.test.ts",
     "prebuild": "npm run clean",
     "build": "tsc -p tsconfig.json",
     "build:watch": "tsc -p tsconfig.json --watch",
@@ -19,7 +19,7 @@
   "dependencies": {},
   "devDependencies": {
     "@types/mocha": "^7.0.2",
-    "@types/node": "^12.12.38",
+    "@types/node": "^14.14.31",
     "fast-check": "^1.24.2",
     "mocha": "^7.1.2",
     "rimraf": "^3.0.2",

+ 5 - 0
src/Source.ts

@@ -11,6 +11,7 @@ import {
   FilteredStreamBuilder,
   LinearShape,
   builderSymbol,
+  FlatMappedStreamBuilder,
 } from './internal/StreamStageBuilder'
 
 import { Sink } from './Sink'
@@ -56,6 +57,10 @@ export class Source<T> extends LinearShape<T> {
     })
   }
 
+  flatMap<O>(f: (v: T) => Source<O>): Source<O> {
+    return new Source(FlatMappedStreamBuilder(this.builder, (v: T) => f(v)[builderSymbol]))
+  }
+
   into<M>(sink: Sink<T, M>) {
     const self = this
     return {

+ 43 - 0
src/internal/StreamStageBuilder.ts

@@ -197,3 +197,46 @@ export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements:
     },
   }
 }
+
+export function FlatMappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, flatMapFn: (v: T) => StreamStageBuilder<U>) {
+  return {
+    build() {
+      let currentMaterializedSource: NodeJS.ReadableStream
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          write(v, {}, onNext) {
+            try {
+              const materializedStreams = flatMapFn(v).build()
+              currentMaterializedSource = materializedStreams[materializedStreams.length - 1]
+              if (materializedStreams.length > 1) {
+                stream.pipeline(materializedStreams, e => {})
+              }
+
+              currentMaterializedSource
+                .on('data', v => {
+                  if (!this.push(v)) {
+                    currentMaterializedSource.pause()
+                  }
+                })
+                .once('close', () => {
+                  onNext()
+                })
+                .once('error', e => {
+                  onNext(e)
+                })
+            } catch (e) {
+              onNext(e)
+            }
+          },
+          read() {
+            if (currentMaterializedSource) {
+              currentMaterializedSource.resume()
+            }
+          },
+        }),
+      ]
+    },
+  }
+}

+ 24 - 0
test/filestream.test.ignored.ts

@@ -0,0 +1,24 @@
+// import * as assert from 'assert'
+// import { Source } from '../src/Source'
+// import * as Sink from '../src/Sink'
+// import * as fs from 'fs'
+
+// describe('file stream', function () {
+//   this.timeout(5000)
+
+//   // it('should run a stream from a file', async () => {
+//   //   const value = await Source.fromReadableBuilder<Buffer>(() => fs.createReadStream('./resources/test/numbers.txt'))
+//   //     .map(v => v.toString('utf-8').split('\n'))
+//   //     .map(numbers =>
+//   //       numbers
+//   //         .split(/\s+/)
+//   //         .filter(v => v.length > 0)
+//   //         .map(v => parseInt(v, 10))
+//   //         .reduce((a, b) => a + b, 0),
+//   //     )
+//   //     .into(Sink.sum)
+//   //     .run()
+
+//   //   assert.strictEqual(value, 525896510)
+//   // })
+// })

+ 0 - 24
test/filestream.test.ts

@@ -1,24 +0,0 @@
-import * as assert from 'assert'
-import { Source } from '../src/Source'
-import * as Sink from '../src/Sink'
-import * as fs from 'fs'
-
-describe('file stream', function () {
-  this.timeout(5000)
-
-  it('should run a stream from a file', async () => {
-    const value = await Source.fromReadableBuilder<Buffer>(() => fs.createReadStream('./resources/test/numbers.txt'))
-      .map(v => v.toString('utf-8'))
-      .map(numbers =>
-        numbers
-          .split(/\s+/)
-          .filter(v => v.length > 0)
-          .map(v => parseInt(v, 10))
-          .reduce((a, b) => a + b, 0),
-      )
-      .into(Sink.sum)
-      .run()
-
-    assert.strictEqual(value, 525896510)
-  })
-})

+ 28 - 2
test/stream.test.ts

@@ -136,7 +136,7 @@ describe('Source', function () {
             .into(Sink.sum),
         )
         .run()
-      assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
+      assert.strictEqual(result, 2 * arr.reduce((a, b) => a + b, 0))
     })
   })
 
@@ -146,7 +146,33 @@ describe('Source', function () {
         .via(Flow.of<number>().map(v => v * 2))
         .into(Sink.sum)
         .run()
-      assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
+      assert.strictEqual(result, 2 * arr.reduce((a, b) => a + b, 0))
     })
   })
+
+  it('should run with simple flatMap', async () => {
+    const result = await Source.fromArray<string[]>([
+      ['a', 'b', 'c'],
+      ['d', 'e', 'f'],
+    ])
+      .flatMap(arr => Source.fromArray(arr))
+      .into(Sink.reduce((a, b) => a + b, ''))
+      .run()
+    assert.strictEqual(result, 'abcdef')
+  })
+
+  it('should run with more complex flatMap', async () => {
+    const result = await Source.fromArray<string[]>([
+      ['a', 'b', 'c'],
+      ['d', 'e', 'f'],
+    ])
+      .flatMap(arr =>
+        Source.fromArray(arr)
+          .filter(v => v != 'c')
+          .map(v => v + v),
+      )
+      .into(Sink.reduce((a, b) => a + b, ''))
+      .run()
+    assert.strictEqual(result, 'aabbddeeff')
+  })
 })