stream.test.ts 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import * as assert from 'assert'
  2. import * as fc from 'fast-check'
  3. import { Source } from '../src/Source'
  4. import * as Sink from '../src/Sink'
  5. import { Flow } from '../src/Flow'
  6. describe('Source', function () {
  7. this.timeout(5000)
  8. it('should run a simple sum pipeline', async () => {
  9. await fc.assert(
  10. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  11. const result = await Source.fromArray(arr).into(Sink.sum).run()
  12. assert.equal(
  13. result,
  14. arr.reduce((a, b) => a + b, 0),
  15. )
  16. }),
  17. )
  18. })
  19. it('should run a simple sum with map', async () => {
  20. await fc.assert(
  21. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  22. const result = await Source.fromArray<number>(arr)
  23. .map(x => x * 2)
  24. .into(Sink.sum)
  25. .run()
  26. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  27. }),
  28. )
  29. })
  30. it('should run a grouped without missing any element', async () => {
  31. await fc.assert(
  32. fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
  33. const result = await Source.fromArray(arr)
  34. .grouped(groupSize)
  35. .map(v => {
  36. assert.ok(v.length <= groupSize)
  37. return v.length
  38. })
  39. .into(Sink.sum)
  40. .run()
  41. assert.equal(result, arr.length, 'An element is missing')
  42. }),
  43. )
  44. })
  45. it('should run a grouped without flushing an empty array', async () => {
  46. await fc.assert(
  47. fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
  48. await Source.fromArray(arr)
  49. .grouped(groupSize)
  50. .map(v => {
  51. assert.ok(v.length > 0)
  52. return v.length
  53. })
  54. .into(Sink.sum)
  55. .run()
  56. }),
  57. )
  58. })
  59. it('should run ordered async', async () => {
  60. await fc.assert(
  61. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  62. const timeout = () => 1 + (0 | (Math.random() * 4))
  63. const result = await Source.fromArray(arr)
  64. .mapAsync(concurrencyLevel, v => {
  65. return new Promise(resolve => setTimeout(() => resolve(v), timeout()))
  66. })
  67. .into(Sink.reduce<number, number[]>((acc, b) => [...acc, b], []))
  68. .run()
  69. assert.deepEqual(result, arr)
  70. }),
  71. )
  72. })
  73. it('should run unordered async computations', async () => {
  74. await fc.assert(
  75. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  76. const result = await Source.fromArray(arr)
  77. .mapAsyncUnordered(concurrencyLevel, v => {
  78. return new Promise(resolve => setTimeout(() => resolve(v * 2), 2))
  79. })
  80. .into(Sink.sum)
  81. .run()
  82. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  83. }),
  84. )
  85. })
  86. it('should filter elements', async () => {
  87. await fc.assert(
  88. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  89. const result = await Source.fromArray(arr)
  90. .filter(n => n % 2 === 0)
  91. .into(Sink.sum)
  92. .run()
  93. assert.deepEqual(result % 2, 0)
  94. }),
  95. )
  96. })
  97. it('should throttle elements', async () => {
  98. const before = Date.now()
  99. const result = await Source.fromArray([1, 2, 3, 4, 5]).throttle(2, 10).into(Sink.sum).run()
  100. const after = Date.now()
  101. const duration = after - before
  102. assert.equal(result, 15)
  103. assert.ok(duration > 20, 'must at least be 20ms but was ' + duration)
  104. })
  105. it('should run forEach correctly', async () => {
  106. await fc.assert(
  107. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  108. let acc = 0
  109. await Source.fromArray(arr)
  110. .into(Sink.forEach(n => (acc += n)))
  111. .run()
  112. assert.strictEqual(
  113. acc,
  114. arr.reduce((a, b) => a + b, 0),
  115. )
  116. }),
  117. )
  118. })
  119. it('should run a flow correctly into a sink', async () => {
  120. await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  121. const result = await Source.fromArray<number>(arr)
  122. .into(
  123. Flow.of<number>()
  124. .map(v => v * 2)
  125. .into(Sink.sum),
  126. )
  127. .run()
  128. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  129. })
  130. })
  131. it('should run a flow with `via`', async () => {
  132. await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  133. const result = await Source.fromArray<number>(arr)
  134. .via(Flow.of<number>().map(v => v * 2))
  135. .into(Sink.sum)
  136. .run()
  137. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  138. })
  139. })
  140. })