| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- import * as assert from 'assert'
- import * as fc from 'fast-check'
- import { Source } from '../src/Source'
- import * as Sink from '../src/Sink'
- import { Flow } from '../src/Flow'
- describe('Source', function () {
- this.timeout(5000)
- it('should run a simple sum pipeline', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
- const result = await Source.fromArray(arr).into(Sink.sum).run()
- assert.equal(
- result,
- arr.reduce((a, b) => a + b, 0),
- )
- }),
- )
- })
- it('should run a simple sum with map', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
- const result = await Source.fromArray<number>(arr)
- .map(x => x * 2)
- .into(Sink.sum)
- .run()
- assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
- }),
- )
- })
- it('should run a grouped without missing any element', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
- const result = await Source.fromArray(arr)
- .grouped(groupSize)
- .map(v => {
- assert.ok(v.length <= groupSize)
- return v.length
- })
- .into(Sink.sum)
- .run()
- assert.equal(result, arr.length, 'An element is missing')
- }),
- )
- })
- it('should run a grouped without flushing an empty array', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
- await Source.fromArray(arr)
- .grouped(groupSize)
- .map(v => {
- assert.ok(v.length > 0)
- return v.length
- })
- .into(Sink.sum)
- .run()
- }),
- )
- })
- it('should run ordered async', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
- const timeout = () => 1 + (0 | (Math.random() * 4))
- const result = await Source.fromArray(arr)
- .mapAsync(concurrencyLevel, v => {
- return new Promise(resolve => setTimeout(() => resolve(v), timeout()))
- })
- .into(Sink.reduce<number, number[]>((acc, b) => [...acc, b], []))
- .run()
- assert.deepEqual(result, arr)
- }),
- )
- })
- it('should run unordered async computations', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
- const result = await Source.fromArray(arr)
- .mapAsyncUnordered(concurrencyLevel, v => {
- return new Promise(resolve => setTimeout(() => resolve(v * 2), 2))
- })
- .into(Sink.sum)
- .run()
- assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
- }),
- )
- })
- it('should filter elements', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array(fc.integer()), async arr => {
- const result = await Source.fromArray(arr)
- .filter(n => n % 2 === 0)
- .into(Sink.sum)
- .run()
- assert.deepEqual(result % 2, 0)
- }),
- )
- })
- it('should throttle elements', async () => {
- const before = Date.now()
- const result = await Source.fromArray([1, 2, 3, 4, 5]).throttle(2, 10).into(Sink.sum).run()
- const after = Date.now()
- const duration = after - before
- assert.equal(result, 15)
- assert.ok(duration > 20, 'must at least be 20ms but was ' + duration)
- })
- it('should run forEach correctly', async () => {
- await fc.assert(
- fc.asyncProperty(fc.array(fc.integer()), async arr => {
- let acc = 0
- await Source.fromArray(arr)
- .into(Sink.forEach(n => (acc += n)))
- .run()
- assert.strictEqual(
- acc,
- arr.reduce((a, b) => a + b, 0),
- )
- }),
- )
- })
- it('should run a flow correctly into a sink', async () => {
- await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
- const result = await Source.fromArray<number>(arr)
- .into(
- Flow.of<number>()
- .map(v => v * 2)
- .into(Sink.sum),
- )
- .run()
- assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
- })
- })
- it('should run a flow with `via`', async () => {
- await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
- const result = await Source.fromArray<number>(arr)
- .via(Flow.of<number>().map(v => v * 2))
- .into(Sink.sum)
- .run()
- assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
- })
- })
- })
|