Jelajahi Sumber

fix flatmap

Aurélien Richez 4 tahun lalu
induk
melakukan
0d094c4f1c
3 mengubah file dengan 38 tambahan dan 21 penghapusan
  1. 1 5
      src/Flow.ts
  2. 1 1
      src/Source.ts
  3. 36 15
      src/internal/StreamStageBuilder.ts

+ 1 - 5
src/Flow.ts

@@ -61,11 +61,7 @@ export class Flow<Input, Output> extends LinearShape<Output> {
   }
 }
 
-interface FlowBuilder<Input, Output> {
-  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
-}
-
-const EmptyFlow: FlowBuilder<any, any> = {
+const EmptyFlow: StreamStageBuilder<any> = {
   build() {
     return []
   },

+ 1 - 1
src/Source.ts

@@ -93,7 +93,7 @@ export class Source<T> extends LinearShape<T> {
 function IterableSourceBuilder<T>(it: Iterable<T>) {
   return {
     build() {
-      return [stream.Readable.from(it)]
+      return [stream.Readable.from(it, { autoDestroy: true })]
     },
   }
 }

+ 36 - 15
src/internal/StreamStageBuilder.ts

@@ -11,7 +11,7 @@ export abstract class LinearShape<T> {
 }
 
 export interface StreamStageBuilder<T> {
-  build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
+  build(): Array<stream.Readable | stream.Duplex>
 }
 
 export function MappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, mapFn: (t: T) => U) {
@@ -201,7 +201,7 @@ export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements:
 export function FlatMappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, flatMapFn: (v: T) => StreamStageBuilder<U>) {
   return {
     build() {
-      let currentMaterializedSource: NodeJS.ReadableStream
+      let currentMaterializedSource: stream.Readable
       return [
         ...prev.build(),
         new stream.Transform({
@@ -211,21 +211,16 @@ export function FlatMappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, flatM
               const materializedStreams = flatMapFn(v).build()
               currentMaterializedSource = materializedStreams[materializedStreams.length - 1]
               if (materializedStreams.length > 1) {
-                stream.pipeline(materializedStreams, e => {})
+                setupMultipleStream(materializedStreams, onNext)
+              } else {
+                setupSingleStream(currentMaterializedSource, onNext)
               }
 
-              currentMaterializedSource
-                .on('data', v => {
-                  if (!this.push(v)) {
-                    currentMaterializedSource.pause()
-                  }
-                })
-                .once('close', () => {
-                  onNext()
-                })
-                .once('error', e => {
-                  onNext(e)
-                })
+              currentMaterializedSource.on('data', data => {
+                if (!this.push(data)) {
+                  currentMaterializedSource.pause()
+                }
+              })
             } catch (e) {
               onNext(e)
             }
@@ -240,3 +235,29 @@ export function FlatMappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, flatM
     },
   }
 }
+
+function setupSingleStream(
+  currentMaterializedSource: stream.Readable,
+  onNext: (error?: Error | null | undefined) => void,
+): void {
+  // The handling is different for a single node tream because we have to destroy it ourselve
+  // whereas with several node streams, pipeline helper will do it for es
+  currentMaterializedSource
+    .once('error', e => {
+      currentMaterializedSource.destroy()
+      onNext(e)
+    })
+    .once('end', () => {
+      currentMaterializedSource.destroy()
+      onNext()
+    })
+}
+
+function setupMultipleStream(
+  materializedStreams: (stream.Readable | stream.Duplex)[],
+  onNext: (error?: Error | null | undefined) => void,
+): void {
+  stream.pipeline(materializedStreams, e => {
+    onNext(e)
+  })
+}