=str: #15802: TickSource cancellable

also, fixing intermediate materialization for Source.concat
This commit is contained in:
Endre Sándor Varga 2014-11-26 16:33:58 +01:00
parent fdf66ae3c5
commit 7344063cfc
4 changed files with 103 additions and 75 deletions

View file

@ -130,7 +130,7 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] = // FIXME why is tick () => T and not T?
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): TickSource[T] =
TickSource(initialDelay, interval, tick)
/**
@ -181,7 +181,16 @@ object Source {
* emitted by the second source is emitted after the last element of the first
* source.
*/
def concat[T](source1: Source[T], source2: Source[T]): Source[T] = ConcatSource(source1, source2)
def concat[T](source1: Source[T], source2: Source[T]): Source[T] = {
val output = UndefinedSink[T]
val concat = Concat[T]
Source() { b
b.addEdge(source1, Pipe.empty[T], concat.first)
.addEdge(source2, Pipe.empty[T], concat.second)
.addEdge(concat.out, Pipe.empty[T], output)
output
}
}
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]