Browse Source

fix grouped flushing an empty array

Aurélien Richez 4 years ago
parent
commit
3c394840f1
2 changed files with 20 additions and 1 deletions
  1. 5 1
      src/Source.ts
  2. 15 0
      src/streams.test.ts

+ 5 - 1
src/Source.ts

@@ -192,7 +192,11 @@ function BatchedSourceBuilder<T>(prev: SourceBuilder<T>, batchSize: number) {
             }
           },
           flush(callback) {
-            callback(undefined, currentBatch)
+            if(currentBatch.length > 0) {
+              callback(undefined, currentBatch)
+            } else {
+              callback()
+            }
           },
         }),
       ]

+ 15 - 0
src/streams.test.ts

@@ -46,6 +46,21 @@ describe('Source', function () {
     )
   })
 
+  it('should run a grouped without flushing an empty array', async () => {
+    await fc.assert(
+      fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
+        await Source.fromArray(arr)
+          .grouped(groupSize)
+          .map(v => {
+            assert.ok(v.length > 0)
+            return v.length
+          })
+          .into(Sink.sum)
+          .run()
+      }),
+    )
+  })
+
   it('should run ordered async', async () => {
     await fc.assert(
       fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {