import * as assert from 'assert' import { Source } from '../src/Source' import * as Sink from '../src/Sink' import * as fs from 'fs' describe('file stream', function () { this.timeout(5000) it('should run a stream from a file', async () => { const value = await Source.fromReadableBuilder(() => fs.createReadStream('./resources/test/numbers.txt')) .map(v => v.toString('utf-8')) .statefulMapConcat(() => { let current: string = '' return [ v => { current = current + v if (current.includes('\n')) { const splitted = current.split('\n') current = splitted.pop()! return splitted } else { return [] } }, () => current.split('\n'), ] }) .map((numbers: string) => numbers .split(/\s/) .filter(v => v.length > 0) .map(v => parseInt(v, 10)) .reduce((a, b) => a + b, 0), ) .into(Sink.sum) .run() assert.strictEqual( value, fs .readFileSync('./resources/test/numbers.txt') .toString('utf-8') .split(/\s/) .filter(s => s.length > 0) .map(v => parseInt(v)) .reduce((a, b) => a + b, 0), ) }) })