str #20262 reduce should fail explicitly on empty stream (#20267)

* str #20262 reduce should fail explicitly on empty stream

* str #20262 document reduce behaviour on empty stream
This commit is contained in:
Konrad Malawski 2016-04-11 15:36:10 +02:00
parent 503a77f515
commit b9ab232cac
8 changed files with 67 additions and 14 deletions

View file

@ -54,6 +54,24 @@ class FlowReduceSpec extends AkkaSpec {
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
}
"fail on empty stream using Source.runReduce" in assertAllStagesStopped {
val result = Source.empty[Int].runReduce(_ + _)
val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) }
ex.getMessage should include("empty stream")
}
"fail on empty stream using Flow.reduce" in assertAllStagesStopped {
val result = Source.empty[Int].via(reduceFlow).runWith(Sink.fold(0)(_ + _))
val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) }
ex.getMessage should include("empty stream")
}
"fail on empty stream using Sink.reduce" in assertAllStagesStopped {
val result = Source.empty[Int].runWith(reduceSink)
val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) }
ex.getMessage should include("empty stream")
}
}
}

View file

@ -1158,31 +1158,36 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl
private[stream] final class Reduce[T](f: (T, T) T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self
override def toString = s"Reduce.Logic(aggregator=$aggregator)"
var aggregator: T = _
// Initial input handler
setHandler(in, new InHandler {
override def onPush(): Unit = {
aggregator = grab(in)
pull(in)
setHandler(in, rest)
setHandler(in, self)
}
override def onUpstreamFinish(): Unit =
failStage(new NoSuchElementException("reduce over empty stream"))
})
def rest = new InHandler {
override def onPush(): Unit = {
aggregator = f(aggregator, grab(in))
pull(in)
}
override def onUpstreamFinish(): Unit = {
push(out, aggregator)
completeStage()
}
override def onPush(): Unit = {
aggregator = f(aggregator, grab(in))
pull(in)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = {
push(out, aggregator)
completeStage()
}
setHandler(out, self)
}
override def toString = "Reduce"
}

View file

@ -573,6 +573,11 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures

View file

@ -36,6 +36,11 @@ object Sink {
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*/
def reduce[In](f: function.Function2[In, In, In]): Sink[In, CompletionStage[In]] =
new Sink(scaladsl.Sink.reduce[In](f.apply).toCompletionStage())

View file

@ -498,6 +498,11 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*/
def runReduce[U >: Out](f: function.Function2[U, U, U], materializer: Materializer): CompletionStage[U] =
runWith(Sink.reduce(f), materializer)

View file

@ -773,6 +773,11 @@ trait FlowOps[+Out, +Mat] {
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures

View file

@ -245,6 +245,11 @@ object Sink {
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*/
def reduce[T](f: (T, T) T): Sink[T, Future[T]] =
Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink")

View file

@ -107,6 +107,11 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce stage will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*/
def runReduce[U >: Out](f: (U, U) U)(implicit materializer: Materializer): Future[U] =
runWith(Sink.reduce(f))