From b9ab232cacab68ce7d1b9a54abe4a00efba06a9a Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 11 Apr 2016 15:36:10 +0200 Subject: [PATCH] str #20262 reduce should fail explicitly on empty stream (#20267) * str #20262 reduce should fail explicitly on empty stream * str #20262 document reduce behaviour on empty stream --- .../akka/stream/scaladsl/FlowReduceSpec.scala | 18 ++++++++++ .../scala/akka/stream/impl/fusing/Ops.scala | 33 +++++++++++-------- .../main/scala/akka/stream/javadsl/Flow.scala | 5 +++ .../main/scala/akka/stream/javadsl/Sink.scala | 5 +++ .../scala/akka/stream/javadsl/Source.scala | 5 +++ .../scala/akka/stream/scaladsl/Flow.scala | 5 +++ .../scala/akka/stream/scaladsl/Sink.scala | 5 +++ .../scala/akka/stream/scaladsl/Source.scala | 5 +++ 8 files changed, 67 insertions(+), 14 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala index a869ea8b8d..75ae640ac5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala @@ -54,6 +54,24 @@ class FlowReduceSpec extends AkkaSpec { the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } + "fail on empty stream using Source.runReduce" in assertAllStagesStopped { + val result = Source.empty[Int].runReduce(_ + _) + val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) } + ex.getMessage should include("empty stream") + } + + "fail on empty stream using Flow.reduce" in assertAllStagesStopped { + val result = Source.empty[Int].via(reduceFlow).runWith(Sink.fold(0)(_ + _)) + val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) } + ex.getMessage should include("empty stream") + } + + "fail on empty stream using Sink.reduce" in assertAllStagesStopped { + val result = Source.empty[Int].runWith(reduceSink) + val ex = intercept[NoSuchElementException] { Await.result(result, 3.seconds) } + ex.getMessage should include("empty stream") + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d5d255443d..befbafc0be 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1158,31 +1158,36 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl private[stream] final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.reduce - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self ⇒ override def toString = s"Reduce.Logic(aggregator=$aggregator)" + var aggregator: T = _ + // Initial input handler setHandler(in, new InHandler { override def onPush(): Unit = { aggregator = grab(in) pull(in) - setHandler(in, rest) + setHandler(in, self) } + + override def onUpstreamFinish(): Unit = + failStage(new NoSuchElementException("reduce over empty stream")) }) - def rest = new InHandler { - override def onPush(): Unit = { - aggregator = f(aggregator, grab(in)) - pull(in) - } - override def onUpstreamFinish(): Unit = { - push(out, aggregator) - completeStage() - } + + override def onPush(): Unit = { + aggregator = f(aggregator, grab(in)) + pull(in) } - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - }) + override def onPull(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + push(out, aggregator) + completeStage() + } + + setHandler(out, self) } override def toString = "Reduce" } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 477b7b1225..f1973402cf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -573,6 +573,11 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * Applies the given function towards its current and next value, * yielding the next current value. * + * If the stream is empty (i.e. completes before signalling any elements), + * the reduce stage will fail its downstream with a [[NoSuchElementException]], + * which is semantically in-line with that Scala's standard library collections + * do in such situations. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 5d9891e4ac..b8ce7d091a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -36,6 +36,11 @@ object Sink { * The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure signaled in the stream. + * + * If the stream is empty (i.e. completes before signalling any elements), + * the reduce stage will fail its downstream with a [[NoSuchElementException]], + * which is semantically in-line with that Scala's standard library collections + * do in such situations. */ def reduce[In](f: function.Function2[In, In, In]): Sink[In, CompletionStage[In]] = new Sink(scaladsl.Sink.reduce[In](f.apply).toCompletionStage()) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 55d6807e1a..4da17ced0e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -498,6 +498,11 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * The returned [[java.util.concurrent.CompletionStage]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure is signaled in the stream. + * + * If the stream is empty (i.e. completes before signalling any elements), + * the reduce stage will fail its downstream with a [[NoSuchElementException]], + * which is semantically in-line with that Scala's standard library collections + * do in such situations. */ def runReduce[U >: Out](f: function.Function2[U, U, U], materializer: Materializer): CompletionStage[U] = runWith(Sink.reduce(f), materializer) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 39027cb6b3..4917350b07 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -773,6 +773,11 @@ trait FlowOps[+Out, +Mat] { * Applies the given function towards its current and next value, * yielding the next current value. * + * If the stream is empty (i.e. completes before signalling any elements), + * the reduce stage will fail its downstream with a [[NoSuchElementException]], + * which is semantically in-line with that Scala's standard library collections + * do in such situations. + * * '''Emits when''' upstream completes * * '''Backpressures when''' downstream backpressures diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index d54cc724ec..1896978c61 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -245,6 +245,11 @@ object Sink { * The returned [[scala.concurrent.Future]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure signaled in the stream. + * + * If the stream is empty (i.e. completes before signalling any elements), + * the reduce stage will fail its downstream with a [[NoSuchElementException]], + * which is semantically in-line with that Scala's standard library collections + * do in such situations. */ def reduce[T](f: (T, T) ⇒ T): Sink[T, Future[T]] = Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index efff974b83..9021d4d9a5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -107,6 +107,11 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * The returned [[scala.concurrent.Future]] will be completed with value of the final * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure signaled in the stream. + * + * If the stream is empty (i.e. completes before signalling any elements), + * the reduce stage will fail its downstream with a [[NoSuchElementException]], + * which is semantically in-line with that Scala's standard library collections + * do in such situations. */ def runReduce[U >: Out](f: (U, U) ⇒ U)(implicit materializer: Materializer): Future[U] = runWith(Sink.reduce(f))