streams.test.ts 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import * as assert from 'assert'
  2. import * as fc from 'fast-check'
  3. import { Source } from '../src/Source'
  4. import * as Sink from '../src/Sink'
  5. describe('Source', function () {
  6. this.timeout(5000)
  7. it('should run a simple sum pipeline', async () => {
  8. await fc.assert(
  9. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  10. const result = await Source.fromArray(arr).into(Sink.sum).run()
  11. assert.equal(
  12. result,
  13. arr.reduce((a, b) => a + b, 0),
  14. )
  15. }),
  16. )
  17. })
  18. it('should run a simple sum with map', async () => {
  19. await fc.assert(
  20. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  21. const result = await Source.fromArray<number>(arr)
  22. .map(x => x * 2)
  23. .into(Sink.sum)
  24. .run()
  25. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  26. }),
  27. )
  28. })
  29. it('should run a grouped without missing any element', async () => {
  30. await fc.assert(
  31. fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
  32. const result = await Source.fromArray(arr)
  33. .grouped(groupSize)
  34. .map(v => {
  35. assert.ok(v.length <= groupSize)
  36. return v.length
  37. })
  38. .into(Sink.sum)
  39. .run()
  40. assert.equal(result, arr.length, 'An element is missing')
  41. }),
  42. )
  43. })
  44. it('should run a grouped without flushing an empty array', async () => {
  45. await fc.assert(
  46. fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
  47. await Source.fromArray(arr)
  48. .grouped(groupSize)
  49. .map(v => {
  50. assert.ok(v.length > 0)
  51. return v.length
  52. })
  53. .into(Sink.sum)
  54. .run()
  55. }),
  56. )
  57. })
  58. it('should run ordered async', async () => {
  59. await fc.assert(
  60. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  61. const timeout = () => 1 + (0 | (Math.random() * 4))
  62. const result = await Source.fromArray(arr)
  63. .mapAsync(concurrencyLevel, v => {
  64. return new Promise(resolve => setTimeout(() => resolve(v), timeout()))
  65. })
  66. .into(Sink.reduce<number, number[]>((acc, b) => [...acc, b], []))
  67. .run()
  68. assert.deepEqual(result, arr)
  69. }),
  70. )
  71. })
  72. it('should run unordered async computations', async () => {
  73. await fc.assert(
  74. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  75. const result = await Source.fromArray(arr)
  76. .mapAsyncUnordered(concurrencyLevel, v => {
  77. return new Promise(resolve => setTimeout(() => resolve(v * 2), 2))
  78. })
  79. .into(Sink.sum)
  80. .run()
  81. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  82. }),
  83. )
  84. })
  85. it('should filter elements', async () => {
  86. await fc.assert(
  87. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  88. const result = await Source.fromArray(arr)
  89. .filter(n => n % 2 === 0)
  90. .into(Sink.sum)
  91. .run()
  92. assert.deepEqual(result % 2, 0)
  93. }),
  94. )
  95. })
  96. it('should throttle elements', async () => {
  97. const before = Date.now()
  98. const result = await Source.fromArray([1, 2, 3, 4, 5]).throttle(2, 10).into(Sink.sum).run()
  99. const after = Date.now()
  100. const duration = after - before
  101. assert.equal(result, 15)
  102. assert.ok(duration > 20, 'must at least be 20ms but was ' + duration)
  103. })
  104. it('should run forEach correctly', async () => {
  105. await fc.assert(
  106. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  107. let acc = 0
  108. await Source.fromArray(arr)
  109. .into(Sink.forEach(n => (acc += n)))
  110. .run()
  111. assert.strictEqual(
  112. acc,
  113. arr.reduce((a, b) => a + b, 0),
  114. )
  115. }),
  116. )
  117. })
  118. })