Browse Source

add statefulMapConcat

Aurélien Richez 4 years ago
parent
commit
2ae1dfd6f2
5 changed files with 90 additions and 24 deletions
  1. 3 0
      package.json
  2. 7 0
      src/Source.ts
  3. 31 0
      src/internal/StreamStageBuilder.ts
  4. 0 24
      test/filestream.test.ignored.ts
  5. 49 0
      test/filestream.test.ts

+ 3 - 0
package.json

@@ -6,6 +6,9 @@
   "files": [
     "lib/*"
   ],
+  "engines": {
+    "node": ">14.0.0"
+  },
   "scripts": {
     "test": "mocha -r ts-node/register test/*.test.ts",
     "test:watch": "mocha -r ts-node/register --watch-extensions ts --watch --watch-files src/**/*.ts --watch-files test/**/*.ts --reporter Min  test/*.test.ts",

+ 7 - 0
src/Source.ts

@@ -13,6 +13,7 @@ import {
   builderSymbol,
   FlatMappedStreamBuilder,
   SizeLimitedStreamBuilder,
+  StatefulStreamBuilder,
 } from './internal/StreamStageBuilder'
 
 import { Sink } from './Sink'
@@ -53,6 +54,12 @@ export class Source<T> extends LinearShape<T> {
     return new Source(SizeLimitedStreamBuilder(this.builder, elements))
   }
 
+  statefulMapConcat<O>(mapConcatFunction: () => [(v: T) => O[], () => O[]]): Source<O>
+  statefulMapConcat<O>(mapConcatFunction: () => (v: T) => O[]): Source<O>
+  statefulMapConcat<O>(mapConcatFunction: (() => (v: T) => O[]) | (() => [(v: T) => O[], () => O[]])): Source<O> {
+    return new Source(StatefulStreamBuilder(this.builder, mapConcatFunction))
+  }
+
   via<O>(flow: Flow<T, O>): Source<O> {
     const builder = this.builder
     return new Source({

+ 31 - 0
src/internal/StreamStageBuilder.ts

@@ -172,6 +172,37 @@ export function FilteredStreamBuilder<T>(prev: StreamStageBuilder<T>, predicate:
   }
 }
 
+export function StatefulStreamBuilder<T, O>(
+  prev: StreamStageBuilder<T>,
+  statefulFnBuilder: (() => (v: T) => O[]) | (() => [(v: T) => O[], () => O[]]),
+) {
+  return {
+    build() {
+      const statefulHandlers = statefulFnBuilder()
+      const mapConcat = typeof statefulHandlers === 'function' ? statefulHandlers : statefulHandlers[0]
+      const flush = typeof statefulHandlers === 'function' ? undefined : statefulHandlers[1]
+      return [
+        ...prev.build(),
+        new stream.Transform({
+          objectMode: true,
+          transform(v, {}, onNext) {
+            try {
+              mapConcat(v).forEach(r => this.push(r))
+              onNext()
+            } catch (e) {
+              onNext(e)
+            }
+          },
+          flush(onEnd) {
+            flush?.().forEach(r => this.push(r))
+            onEnd()
+          },
+        }),
+      ]
+    },
+  }
+}
+
 export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements: number, perDurationMs: number) {
   return {
     build() {

+ 0 - 24
test/filestream.test.ignored.ts

@@ -1,24 +0,0 @@
-// import * as assert from 'assert'
-// import { Source } from '../src/Source'
-// import * as Sink from '../src/Sink'
-// import * as fs from 'fs'
-
-// describe('file stream', function () {
-//   this.timeout(5000)
-
-//   // it('should run a stream from a file', async () => {
-//   //   const value = await Source.fromReadableBuilder<Buffer>(() => fs.createReadStream('./resources/test/numbers.txt'))
-//   //     .map(v => v.toString('utf-8').split('\n'))
-//   //     .map(numbers =>
-//   //       numbers
-//   //         .split(/\s+/)
-//   //         .filter(v => v.length > 0)
-//   //         .map(v => parseInt(v, 10))
-//   //         .reduce((a, b) => a + b, 0),
-//   //     )
-//   //     .into(Sink.sum)
-//   //     .run()
-
-//   //   assert.strictEqual(value, 525896510)
-//   // })
-// })

+ 49 - 0
test/filestream.test.ts

@@ -0,0 +1,49 @@
+import * as assert from 'assert'
+import { Source } from '../src/Source'
+import * as Sink from '../src/Sink'
+import * as fs from 'fs'
+
+describe('file stream', function () {
+  this.timeout(5000)
+
+  it('should run a stream from a file', async () => {
+    const value = await Source.fromReadableBuilder<Buffer>(() => fs.createReadStream('./resources/test/numbers.txt'))
+      .map(v => v.toString('utf-8'))
+      .statefulMapConcat<string>(() => {
+        let current: string = ''
+        return [
+          v => {
+            current = current + v
+            if (current.includes('\n')) {
+              const splitted = current.split('\n')
+              current = splitted.pop()!
+              return splitted
+            } else {
+              return []
+            }
+          },
+          () => current.split('\n'),
+        ]
+      })
+      .map((numbers: string) =>
+        numbers
+          .split(/\s/)
+          .filter(v => v.length > 0)
+          .map(v => parseInt(v, 10))
+          .reduce((a, b) => a + b, 0),
+      )
+      .into(Sink.sum)
+      .run()
+
+    assert.strictEqual(
+      value,
+      fs
+        .readFileSync('./resources/test/numbers.txt')
+        .toString('utf-8')
+        .split(/\s/)
+        .filter(s => s.length > 0)
+        .map(v => parseInt(v))
+        .reduce((a, b) => a + b, 0),
+    )
+  })
+})