#18962 MergeSorted and read() completion handling

This commit is contained in:
Roland Kuhn 2015-11-18 17:41:52 +01:00
parent fda9c5d1b8
commit ac83b1965c
11 changed files with 315 additions and 54 deletions

View file

@ -16,7 +16,6 @@ import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream.stage.Stage
import akka.util.ByteString
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
import scala.collection.immutable
@ -595,6 +594,37 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.mergeMat(that)(combinerToScala(matF)))
/**
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
* picking always the smallest of the available elements (waiting for one element from each side
* to be available). This means that possible contiguity of the input streams is not exploited to avoid
* waiting for elements, this merge will block when one of the inputs does not have more elements (and
* does not complete).
*
* '''Emits when''' all of the inputs have an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
*
* '''Cancels when''' downstream cancels
*/
def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M], comp: util.Comparator[U]): javadsl.Source[U, Mat] =
new Source(delegate.mergeSorted(that)(Ordering.comparatorToOrdering(comp)))
/**
* Merge the given [[Source]] to this [[Source]], taking elements as they arrive from input streams,
* picking always the smallest of the available elements (waiting for one element from each side
* to be available). This means that possible contiguity of the input streams is not exploited to avoid
* waiting for elements, this merge will block when one of the inputs does not have more elements (and
* does not complete).
*
* @see [[#mergeSorted]].
*/
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: util.Comparator[U],
matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[U, Mat3] =
new Source(delegate.mergeSortedMat(that)(combinerToScala(matF))(Ordering.comparatorToOrdering(comp)))
/**
* Combine the elements of current [[Source]] and the given one into a stream of tuples.
*