Sink.ts 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import * as stream from 'stream'
  2. import { Phantom } from './internal/common'
  3. export const Done = Symbol('Done')
  4. export type Done = typeof Done
  5. export interface Sink<T, Mat> {
  6. builder: SinkBuilder<Mat>
  7. __phantom__: T
  8. }
  9. interface SinkBuilder<M> {
  10. buildStreams?: () => Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
  11. buildWritable(out: (value: M) => void): stream.Writable
  12. }
  13. export const sum: Sink<number, number> = {
  14. builder: {
  15. buildWritable(out) {
  16. let result = 0
  17. return new stream.Writable({
  18. objectMode: true,
  19. write(v, _, onNext) {
  20. result += v
  21. onNext()
  22. },
  23. }).on('finish', () => out(result))
  24. },
  25. },
  26. __phantom__: Phantom(),
  27. }
  28. export const ignore: Sink<any, Done> = {
  29. builder: {
  30. buildWritable(out) {
  31. return new stream.Writable({
  32. objectMode: true,
  33. write(v, _, onNext) {
  34. onNext()
  35. },
  36. }).on('finish', () => out(Done))
  37. },
  38. },
  39. __phantom__: Phantom(),
  40. }
  41. export function reduce<V, Mat>(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Sink<V, Mat> {
  42. return {
  43. builder: {
  44. buildWritable(out) {
  45. let acc = zero
  46. return new stream.Writable({
  47. objectMode: true,
  48. write(v, _, onNext) {
  49. acc = reduceFn(acc, v)
  50. onNext()
  51. },
  52. }).on('finish', () => out(acc))
  53. },
  54. },
  55. __phantom__: Phantom(),
  56. }
  57. }
  58. export function forEach<T>(fn: (t: T) => void): Sink<T, Done> {
  59. return {
  60. builder: {
  61. buildWritable(out) {
  62. return new stream.Writable({
  63. objectMode: true,
  64. write(v, _, onNext) {
  65. fn(v)
  66. onNext()
  67. },
  68. }).on('finish', () => out(Done))
  69. },
  70. },
  71. __phantom__: Phantom(),
  72. }
  73. }