| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- import * as stream from 'stream'
- import { Phantom } from './internal/common'
- export const Done = Symbol('Done')
- export type Done = typeof Done
- export interface Sink<T, Mat> {
- builder: SinkBuilder<Mat>
- __phantom__: T
- }
- interface SinkBuilder<M> {
- buildStreams?: () => Array<NodeJS.ReadableStream | NodeJS.ReadWriteStream>
- buildWritable(out: (value: M) => void): stream.Writable
- }
- export const sum: Sink<number, number> = {
- builder: {
- buildWritable(out) {
- let result = 0
- return new stream.Writable({
- objectMode: true,
- write(v, _, onNext) {
- result += v
- onNext()
- },
- }).on('finish', () => out(result))
- },
- },
- __phantom__: Phantom(),
- }
- export const ignore: Sink<any, Done> = {
- builder: {
- buildWritable(out) {
- return new stream.Writable({
- objectMode: true,
- write(v, _, onNext) {
- onNext()
- },
- }).on('finish', () => out(Done))
- },
- },
- __phantom__: Phantom(),
- }
- export function reduce<V, Mat>(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Sink<V, Mat> {
- return {
- builder: {
- buildWritable(out) {
- let acc = zero
- return new stream.Writable({
- objectMode: true,
- write(v, _, onNext) {
- acc = reduceFn(acc, v)
- onNext()
- },
- }).on('finish', () => out(acc))
- },
- },
- __phantom__: Phantom(),
- }
- }
- export function forEach<T>(fn: (t: T) => void): Sink<T, Done> {
- return {
- builder: {
- buildWritable(out) {
- return new stream.Writable({
- objectMode: true,
- write(v, _, onNext) {
- fn(v)
- onNext()
- },
- }).on('finish', () => out(Done))
- },
- },
- __phantom__: Phantom(),
- }
- }
|