StreamStageBuilder.ts 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import * as stream from 'stream'
  2. import * as assert from 'assert'
  3. export interface StreamStageBuilder<T> {
  4. build(): Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
  5. }
  6. export function MappedStreamBuilder<T, U>(prev: StreamStageBuilder<T>, mapFn: (t: T) => U) {
  7. return {
  8. build() {
  9. return [
  10. ...prev.build(),
  11. new stream.Transform({
  12. objectMode: true,
  13. transform(v, {}, callback) {
  14. try {
  15. callback(undefined, mapFn(v))
  16. } catch (e) {
  17. callback(e)
  18. }
  19. },
  20. }),
  21. ]
  22. },
  23. }
  24. }
  25. export function MappedAsyncStreamBuilder<T, U>(
  26. prev: StreamStageBuilder<T>,
  27. maxConcurrency: number,
  28. mapFn: (t: T) => Promise<U>,
  29. ) {
  30. let concurrency = 0
  31. return {
  32. build() {
  33. return [
  34. ...prev.build(),
  35. new stream.Transform({
  36. objectMode: true,
  37. transform(v, {}, callback) {
  38. concurrency++
  39. assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
  40. const promise = Promise.resolve(v).then(mapFn)
  41. this.push(promise)
  42. if (concurrency < maxConcurrency) {
  43. callback()
  44. }
  45. promise.finally(() => {
  46. if (concurrency === maxConcurrency) {
  47. concurrency--
  48. callback()
  49. } else {
  50. concurrency--
  51. }
  52. })
  53. },
  54. }),
  55. new stream.Transform({
  56. objectMode: true,
  57. writableHighWaterMark: maxConcurrency,
  58. transform(promise: Promise<U>, {}, callback) {
  59. promise.then(result => callback(undefined, result), callback)
  60. },
  61. }),
  62. ]
  63. },
  64. }
  65. }
  66. export function MappedAsyncUnorderedStreamBuilder<T, U>(
  67. prev: StreamStageBuilder<T>,
  68. maxConcurrency: number,
  69. mapFn: (t: T) => Promise<U>,
  70. ) {
  71. let concurrency = 0
  72. return {
  73. build() {
  74. const promises: Set<Promise<unknown>> = new Set()
  75. return [
  76. ...prev.build(),
  77. new stream.Transform({
  78. objectMode: true,
  79. transform(v, {}, onNext) {
  80. concurrency++
  81. assert.ok(concurrency <= maxConcurrency, `too much concurrent data (concurrency: ${concurrency})`)
  82. const promise = Promise.resolve(v).then(mapFn)
  83. if (concurrency < maxConcurrency) {
  84. onNext()
  85. }
  86. promises.add(
  87. promise
  88. .then(result => this.push(result), onNext)
  89. .finally(() => {
  90. promises.delete(promise)
  91. if (concurrency === maxConcurrency) {
  92. concurrency--
  93. onNext()
  94. } else {
  95. concurrency--
  96. }
  97. }),
  98. )
  99. },
  100. flush(onEnd) {
  101. Promise.all(promises).then(() => onEnd(), onEnd)
  102. },
  103. }),
  104. ]
  105. },
  106. }
  107. }
  108. export function BatchedStreamBuilder<T>(prev: StreamStageBuilder<T>, batchSize: number) {
  109. let currentBatch: T[] = []
  110. return {
  111. build() {
  112. return [
  113. ...prev.build(),
  114. new stream.Transform({
  115. objectMode: true,
  116. transform(v, {}, callback) {
  117. currentBatch.push(v)
  118. if (currentBatch.length == batchSize) {
  119. callback(undefined, currentBatch)
  120. currentBatch = []
  121. } else {
  122. callback()
  123. }
  124. },
  125. flush(callback) {
  126. if (currentBatch.length > 0) {
  127. callback(undefined, currentBatch)
  128. } else {
  129. callback()
  130. }
  131. },
  132. }),
  133. ]
  134. },
  135. }
  136. }
  137. export function FilteredStreamBuilder<T>(prev: StreamStageBuilder<T>, predicate: (v: T) => boolean) {
  138. return {
  139. build() {
  140. return [
  141. ...prev.build(),
  142. new stream.Transform({
  143. objectMode: true,
  144. transform(v, {}, onNext) {
  145. try {
  146. if (predicate(v)) {
  147. this.push(v)
  148. }
  149. onNext()
  150. } catch (e) {
  151. onNext(e)
  152. }
  153. },
  154. }),
  155. ]
  156. },
  157. }
  158. }
  159. export function ThrottledStreamBuilder<T>(prev: StreamStageBuilder<T>, elements: number, perDurationMs: number) {
  160. let timestamps: number[] = []
  161. return {
  162. build() {
  163. return [
  164. ...prev.build(),
  165. new stream.Transform({
  166. objectMode: true,
  167. transform(v, {}, onNext) {
  168. const current = Date.now()
  169. timestamps = [...timestamps.filter(t => t > current - perDurationMs), current]
  170. this.push(v)
  171. if (timestamps.length >= elements) {
  172. const timeToWait = timestamps[0] + perDurationMs - Date.now()
  173. setTimeout(onNext, timeToWait)
  174. } else {
  175. onNext()
  176. }
  177. },
  178. }),
  179. ]
  180. },
  181. }
  182. }