filestream.test.ts 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. import * as assert from 'assert'
  2. import { Source } from '../src/Source'
  3. import * as Sink from '../src/Sink'
  4. import * as fs from 'fs'
  5. describe('file stream', function () {
  6. this.timeout(5000)
  7. it('should run a stream from a file', async () => {
  8. const value = await Source.fromReadableBuilder<Buffer>(() => fs.createReadStream('./resources/test/numbers.txt'))
  9. .map(v => v.toString('utf-8'))
  10. .statefulMapConcat<string>(() => {
  11. let current: string = ''
  12. return [
  13. v => {
  14. current = current + v
  15. if (current.includes('\n')) {
  16. const splitted = current.split('\n')
  17. current = splitted.pop()!
  18. return splitted
  19. } else {
  20. return []
  21. }
  22. },
  23. () => current.split('\n'),
  24. ]
  25. })
  26. .map((numbers: string) =>
  27. numbers
  28. .split(/\s/)
  29. .filter(v => v.length > 0)
  30. .map(v => parseInt(v, 10))
  31. .reduce((a, b) => a + b, 0),
  32. )
  33. .into(Sink.sum)
  34. .run()
  35. assert.strictEqual(
  36. value,
  37. fs
  38. .readFileSync('./resources/test/numbers.txt')
  39. .toString('utf-8')
  40. .split(/\s/)
  41. .filter(s => s.length > 0)
  42. .map(v => parseInt(v))
  43. .reduce((a, b) => a + b, 0),
  44. )
  45. })
  46. })