stream.test.ts 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import * as assert from 'assert'
  2. import * as fc from 'fast-check'
  3. import { Source } from '../src/Source'
  4. import * as Sink from '../src/Sink'
  5. import { Flow } from '../src/Flow'
  6. describe('Source', function () {
  7. this.timeout(5000)
  8. it('should run a simple sum pipeline', async () => {
  9. await fc.assert(
  10. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  11. const result = await Source.fromArray(arr).into(Sink.sum).run()
  12. assert.equal(
  13. result,
  14. arr.reduce((a, b) => a + b, 0),
  15. )
  16. }),
  17. )
  18. })
  19. it('should run a simple sum with map', async () => {
  20. await fc.assert(
  21. fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  22. const result = await Source.fromArray<number>(arr)
  23. .map(x => x * 2)
  24. .into(Sink.sum)
  25. .run()
  26. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  27. }),
  28. )
  29. })
  30. it('should run a grouped without missing any element', async () => {
  31. await fc.assert(
  32. fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
  33. const result = await Source.fromArray(arr)
  34. .grouped(groupSize)
  35. .map(v => {
  36. assert.ok(v.length <= groupSize)
  37. return v.length
  38. })
  39. .into(Sink.sum)
  40. .run()
  41. assert.equal(result, arr.length, 'An element is missing')
  42. }),
  43. )
  44. })
  45. it('should run a grouped without flushing an empty array', async () => {
  46. await fc.assert(
  47. fc.asyncProperty(fc.array(fc.constant(1)), fc.integer(1, 10), async (arr, groupSize) => {
  48. await Source.fromArray(arr)
  49. .grouped(groupSize)
  50. .map(v => {
  51. assert.ok(v.length > 0)
  52. return v.length
  53. })
  54. .into(Sink.sum)
  55. .run()
  56. }),
  57. )
  58. })
  59. it('should run ordered async', async () => {
  60. await fc.assert(
  61. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  62. const timeout = () => 1 + (0 | (Math.random() * 4))
  63. const result = await Source.fromArray(arr)
  64. .mapAsync(concurrencyLevel, v => {
  65. return new Promise(resolve => setTimeout(() => resolve(v), timeout()))
  66. })
  67. .into(Sink.reduce<number, number[]>((acc, b) => [...acc, b], []))
  68. .run()
  69. assert.deepEqual(result, arr)
  70. }),
  71. )
  72. })
  73. it('should run unordered async computations', async () => {
  74. await fc.assert(
  75. fc.asyncProperty(fc.array(fc.integer()), fc.integer(1, 50), async (arr, concurrencyLevel) => {
  76. const result = await Source.fromArray(arr)
  77. .mapAsyncUnordered(concurrencyLevel, v => {
  78. return new Promise(resolve => setTimeout(() => resolve(v * 2), 2))
  79. })
  80. .into(Sink.sum)
  81. .run()
  82. assert.equal(result, 2 * arr.reduce((a, b) => a + b, 0))
  83. }),
  84. )
  85. })
  86. it('should filter elements', async () => {
  87. await fc.assert(
  88. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  89. const result = await Source.fromArray(arr)
  90. .filter(n => n % 2 === 0)
  91. .into(Sink.sum)
  92. .run()
  93. assert.deepEqual(result % 2, 0)
  94. }),
  95. )
  96. })
  97. it('should throttle elements', async () => {
  98. const before = Date.now()
  99. const result = await Source.fromArray([1, 2, 3, 4, 5]).throttle(2, 10).into(Sink.sum).run()
  100. const after = Date.now()
  101. const duration = after - before
  102. assert.equal(result, 15)
  103. assert.ok(duration > 20, 'must at least be 20ms but was ' + duration)
  104. })
  105. it('should run forEach correctly', async () => {
  106. await fc.assert(
  107. fc.asyncProperty(fc.array(fc.integer()), async arr => {
  108. let acc = 0
  109. await Source.fromArray(arr)
  110. .into(Sink.forEach(n => (acc += n)))
  111. .run()
  112. assert.strictEqual(
  113. acc,
  114. arr.reduce((a, b) => a + b, 0),
  115. )
  116. }),
  117. )
  118. })
  119. it('should run a flow correctly into a sink', async () => {
  120. await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  121. const result = await Source.fromArray<number>(arr)
  122. .into(
  123. Flow.of<number>()
  124. .map(v => v * 2)
  125. .into(Sink.sum),
  126. )
  127. .run()
  128. assert.strictEqual(result, 2 * arr.reduce((a, b) => a + b, 0))
  129. })
  130. })
  131. it('should run a flow with `via`', async () => {
  132. await fc.asyncProperty(fc.array<number>(fc.integer()), async arr => {
  133. const result = await Source.fromArray<number>(arr)
  134. .via(Flow.of<number>().map(v => v * 2))
  135. .into(Sink.sum)
  136. .run()
  137. assert.strictEqual(result, 2 * arr.reduce((a, b) => a + b, 0))
  138. })
  139. })
  140. it('should run with simple flatMap', async () => {
  141. const result = await Source.fromArray<string[]>([
  142. ['a', 'b', 'c'],
  143. ['d', 'e', 'f'],
  144. ])
  145. .flatMap(arr => Source.fromArray(arr))
  146. .into(Sink.reduce((a, b) => a + b, ''))
  147. .run()
  148. assert.strictEqual(result, 'abcdef')
  149. })
  150. it('should run with more complex flatMap', async () => {
  151. const result = await Source.fromArray<string[]>([
  152. ['a', 'b', 'c'],
  153. ['d', 'e', 'f'],
  154. ])
  155. .flatMap(arr =>
  156. Source.fromArray(arr)
  157. .filter(v => v != 'c')
  158. .map(v => v + v),
  159. )
  160. .into(Sink.reduce((a, b) => a + b, ''))
  161. .run()
  162. assert.strictEqual(result, 'aabbddeeff')
  163. })
  164. it('should limit the size of a stream with take', async () => {
  165. function* infinite() {
  166. while (true) {
  167. yield 1
  168. }
  169. }
  170. await fc.asyncProperty(fc.integer(), async num => {
  171. const result = await Source.fromIterable(infinite).take(num).into(Sink.sum).run()
  172. assert.strictEqual(result, num)
  173. })
  174. })
  175. })