Adding preMaterialize operator for Source #23894 (#24497)

This commit is contained in:
Stefano Bonetti 2018-02-21 06:06:01 +00:00 committed by Konrad `ktoso` Malawski
parent e44fafd4b7
commit 3ea59b1e76
7 changed files with 135 additions and 4 deletions

View file

@ -475,6 +475,15 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] =
new Source(delegate.mapMaterializedValue(f.apply _))
/**
* Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source
* that can be used to consume elements from the newly materialized Source.
*/
def preMaterialize(materializer: Materializer): Pair[Mat @uncheckedVariance, Source[Out @uncheckedVariance, NotUsed]] = {
val (mat, src) = delegate.preMaterialize()(materializer)
Pair(mat, new Source(src))
}
/**
* Transform this [[Source]] by appending the given processing stages.
* {{{
@ -1922,8 +1931,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*
* '''Cancels when''' downstream cancels or substream cancels
*/
def prefixAndTail(n: Int): javadsl.Source[akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] =
new Source(delegate.prefixAndTail(n).map { case (taken, tail) akka.japi.Pair(taken.asJava, tail.asJava) })
def prefixAndTail(n: Int): javadsl.Source[Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] =
new Source(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail.asJava) })
/**
* This operation demultiplexes the incoming stream into separate output