streams.test.ts 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import * as assert from 'assert'
  2. import * as fc from 'fast-check'
  3. import { Source } from './Source'
  4. import * as Sink from './Sink'
  5. describe('Source', function () {
  6. this.timeout(5000)
  7. it('should run a simple sum pipeline', async () => {
  8. await fc.assert(
  9. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  10. const result = await Source.fromArray(arr).into(Sink.sum).run()
  11. assert.equal(
  12. result,
  13. arr.reduce((a, b) => a + b, 0),
  14. )
  15. }),
  16. )
  17. })
  18. it('should run a simple sum with map', async () => {
  19. await fc.assert(
  20. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  21. const result = await Source.fromArray<number>(arr)
  22. .map(x => x * 2)
  23. .into(Sink.sum)
  24. .run()
  25. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  26. }),
  27. )
  28. })
  29. it('should run a grouped without missing any element', async () => {
  30. await fc.assert(
  31. fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
  32. const result = await Source.fromArray(arr)
  33. .grouped(groupSize)
  34. .map(v => {
  35. assert.ok(v.length <= groupSize)
  36. return v.length
  37. })
  38. .into(Sink.sum)
  39. .run()
  40. assert.equal(result, arr.length, 'An element is missing')
  41. }),
  42. )
  43. })
  44. it('should run ordered async', async () => {
  45. await fc.assert(
  46. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  47. const timeout = () => 1 + (0 | (Math.random() * 4))
  48. const result = await Source.fromArray(arr)
  49. .mapAsync(concurrencyLevel, v => {
  50. return new Promise(resolve => setTimeout(() => resolve(v), timeout()))
  51. })
  52. .into(Sink.reduce<number, number[]>((acc, b) => [...acc, b], []))
  53. .run()
  54. assert.deepEqual(result, arr)
  55. }),
  56. )
  57. })
  58. it('should run unordered async computations', async () => {
  59. await fc.assert(
  60. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  61. const result = await Source.fromArray(arr)
  62. .mapAsyncUnordered(concurrencyLevel, v => {
  63. return new Promise(resolve => setTimeout(() => resolve(v * 2), 2))
  64. })
  65. .into(Sink.sum)
  66. .run()
  67. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  68. }),
  69. )
  70. })
  71. it('should filter elements', async () => {
  72. await fc.assert(
  73. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  74. const result = await Source.fromArray(arr)
  75. .filter(n => n % 2 === 0)
  76. .into(Sink.sum)
  77. .run()
  78. assert.deepEqual(result % 2, 0)
  79. }),
  80. )
  81. })
  82. it('should throttle elements', async () => {
  83. const before = Date.now()
  84. const result = await Source.fromArray([1, 2, 3, 4, 5]).throttle(2, 10).into(Sink.sum).run()
  85. const after = Date.now()
  86. const duration = after - before
  87. assert.equal(result, 15)
  88. assert.ok(duration > 20, 'must at least be 20ms but was ' + duration)
  89. })
  90. it('should run forEach correctly', async () => {
  91. await fc.assert(
  92. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  93. let acc = 0
  94. await Source.fromArray(arr)
  95. .into(Sink.forEach(n => (acc += n)))
  96. .run()
  97. assert.strictEqual(
  98. acc,
  99. arr.reduce((a, b) => a + b, 0),
  100. )
  101. }),
  102. )
  103. })
  104. })