import * as stream from 'stream' import { Phantom } from './internal/common' export const Done = Symbol('Done') export type Done = typeof Done export interface Sink { builder: SinkBuilder __phantom__: T } interface SinkBuilder { buildStreams?: () => Array buildWritable(out: (value: M) => void): stream.Writable } export const sum: Sink = { builder: { buildWritable(out) { let result = 0 return new stream.Writable({ objectMode: true, write(v, _, onNext) { result += v onNext() }, }).on('finish', () => out(result)) }, }, __phantom__: Phantom(), } export const ignore: Sink = { builder: { buildWritable(out) { return new stream.Writable({ objectMode: true, write(v, _, onNext) { onNext() }, }).on('finish', () => out(Done)) }, }, __phantom__: Phantom(), } export function reduce(reduceFn: (acc: Mat, v: V) => Mat, zero: Mat): Sink { return { builder: { buildWritable(out) { let acc = zero return new stream.Writable({ objectMode: true, write(v, _, onNext) { acc = reduceFn(acc, v) onNext() }, }).on('finish', () => out(acc)) }, }, __phantom__: Phantom(), } } export function forEach(fn: (t: T) => void): Sink { return { builder: { buildWritable(out) { return new stream.Writable({ objectMode: true, write(v, _, onNext) { fn(v) onNext() }, }).on('finish', () => out(Done)) }, }, __phantom__: Phantom(), } }