import * as assert from 'assert' import * as fc from 'fast-check' import { Source } from '../src/Source' import * as Sink from '../src/Sink' import { Flow } from '../src/Flow' describe('Source', function () { this.timeout(5000) it('should run a simple sum pipeline', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.integer()), async arr => { const result = await Source.fromArray(arr).into(Sink.sum).run() assert.equal( result, arr.reduce((a, b) => a + b, 0), ) }), ) }) it('should run a simple sum with map', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.integer()), async arr => { const result = await Source.fromArray(arr) .map(x => x * 2) .into(Sink.sum) .run() assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0)) }), ) }) it('should run a grouped without missing any element', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => { const result = await Source.fromArray(arr) .grouped(groupSize) .map(v => { assert.ok(v.length <= groupSize) return v.length }) .into(Sink.sum) .run() assert.equal(result, arr.length, 'An element is missing') }), ) }) it('should run a grouped without flushing an empty array', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => { await Source.fromArray(arr) .grouped(groupSize) .map(v => { assert.ok(v.length > 0) return v.length }) .into(Sink.sum) .run() }), ) }) it('should run ordered async', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => { const timeout = () => 1 + (0 | (Math.random() * 4)) const result = await Source.fromArray(arr) .mapAsync(concurrencyLevel, v => { return new Promise(resolve => setTimeout(() => resolve(v), timeout())) }) .into(Sink.reduce((acc, b) => [...acc, b], [])) .run() assert.deepEqual(result, arr) }), ) }) it('should run unordered async computations', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => { const result = await Source.fromArray(arr) .mapAsyncUnordered(concurrencyLevel, v => { return new Promise(resolve => setTimeout(() => resolve(v * 2), 2)) }) .into(Sink.sum) .run() assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0)) }), ) }) it('should filter elements', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.integer()), async arr => { const result = await Source.fromArray(arr) .filter(n => n % 2 === 0) .into(Sink.sum) .run() assert.deepEqual(result % 2, 0) }), ) }) it('should throttle elements', async () => { const before = Date.now() const result = await Source.fromArray([1, 2, 3, 4, 5]).throttle(2, 10).into(Sink.sum).run() const after = Date.now() const duration = after - before assert.equal(result, 15) assert.ok(duration > 20, 'must at least be 20ms but was ' + duration) }) it('should run forEach correctly', async () => { await fc.assert( fc.asyncProperty(fc.array(fc.integer()), async arr => { let acc = 0 await Source.fromArray(arr) .into(Sink.forEach(n => (acc += n))) .run() assert.strictEqual( acc, arr.reduce((a, b) => a + b, 0), ) }), ) }) it('should run a flow correctly into a sink', async () => { await fc.asyncProperty(fc.array(fc.integer()), async arr => { const result = await Source.fromArray(arr) .into( Flow.of() .map(v => v * 2) .into(Sink.sum), ) .run() assert.strictEqual(result, 2 * arr.reduce((a, b) => a + b, 0)) }) }) it('should run a flow with `via`', async () => { await fc.asyncProperty(fc.array(fc.integer()), async arr => { const result = await Source.fromArray(arr) .via(Flow.of().map(v => v * 2)) .into(Sink.sum) .run() assert.strictEqual(result, 2 * arr.reduce((a, b) => a + b, 0)) }) }) it('should run with simple flatMap', async () => { const result = await Source.fromArray([ ['a', 'b', 'c'], ['d', 'e', 'f'], ]) .flatMap(arr => Source.fromArray(arr)) .into(Sink.reduce((a, b) => a + b, '')) .run() assert.strictEqual(result, 'abcdef') }) it('should run with more complex flatMap', async () => { const result = await Source.fromArray([ ['a', 'b', 'c'], ['d', 'e', 'f'], ]) .flatMap(arr => Source.fromArray(arr) .filter(v => v != 'c') .map(v => v + v), ) .into(Sink.reduce((a, b) => a + b, '')) .run() assert.strictEqual(result, 'aabbddeeff') }) })