|
|
@@ -18,6 +18,7 @@ import {
|
|
|
|
|
|
import { Sink } from './Sink'
|
|
|
import { Flow } from './Flow'
|
|
|
+import { Runnable } from './Runnable'
|
|
|
|
|
|
export class Source<T> extends LinearShape<T> {
|
|
|
protected __phantom__!: T
|
|
|
@@ -30,11 +31,11 @@ export class Source<T> extends LinearShape<T> {
|
|
|
return new Source(MappedStreamBuilder(this.builder, f))
|
|
|
}
|
|
|
|
|
|
- mapAsync<O>(concurrency: number, f: (v: T) => Promise<O>) {
|
|
|
+ mapAsync<O>(concurrency: number, f: (v: T) => Promise<O>): Source<O> {
|
|
|
return new Source(MappedAsyncStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
|
|
|
}
|
|
|
|
|
|
- mapAsyncUnordered<O>(concurrency: number, f: (v: T) => Promise<O>) {
|
|
|
+ mapAsyncUnordered<O>(concurrency: number, f: (v: T) => Promise<O>): Source<O> {
|
|
|
return new Source(MappedAsyncUnorderedStreamBuilder(this.builder, concurrency < 1 ? 1 : 0 | concurrency, f))
|
|
|
}
|
|
|
|
|
|
@@ -73,7 +74,7 @@ export class Source<T> extends LinearShape<T> {
|
|
|
return new Source(FlatMappedStreamBuilder(this.builder, (v: T) => f(v)[builderSymbol]))
|
|
|
}
|
|
|
|
|
|
- into<M>(sink: Sink<T, M>) {
|
|
|
+ into<M>(sink: Sink<T, M>): Runnable<M> {
|
|
|
const self = this
|
|
|
return {
|
|
|
run() {
|