|
@@ -29,6 +29,10 @@ export class Source<T> {
|
|
|
return new Source(FilteredSourceBuilder(this.builder, predicate))
|
|
return new Source(FilteredSourceBuilder(this.builder, predicate))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ throttle(elements: number, perDurationMs: number) {
|
|
|
|
|
+ return new Source(ThrottledSourceBuilder(this.builder, elements, perDurationMs))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
into<M>(sink: Sink<T, M>) {
|
|
into<M>(sink: Sink<T, M>) {
|
|
|
const self = this
|
|
const self = this
|
|
|
return {
|
|
return {
|
|
@@ -218,3 +222,29 @@ function FilteredSourceBuilder<T>(prev: SourceBuilder<T>, predicate: (v: T) => b
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+function ThrottledSourceBuilder<T>(prev: SourceBuilder<T>, elements: number, perDurationMs: number) {
|
|
|
|
|
+ let timestamps: number[] = []
|
|
|
|
|
+ return {
|
|
|
|
|
+ build() {
|
|
|
|
|
+ return [
|
|
|
|
|
+ ...prev.build(),
|
|
|
|
|
+ new stream.Transform({
|
|
|
|
|
+ objectMode: true,
|
|
|
|
|
+ transform(v, {}, onNext) {
|
|
|
|
|
+ const current = Date.now()
|
|
|
|
|
+ timestamps = [...timestamps.filter(t => t > (current - perDurationMs)), current]
|
|
|
|
|
+ this.push(v)
|
|
|
|
|
+ if(timestamps.length >= elements) {
|
|
|
|
|
+ const timeToWait = (timestamps[0] + perDurationMs) - Date.now()
|
|
|
|
|
+ setTimeout(onNext, timeToWait)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ onNext()
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+ ]
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|