diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala index f8f5e9532d..231a318640 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala @@ -4,12 +4,12 @@ package akka.stream.scaladsl2 import scala.concurrent.Await -import scala.concurrent.duration._ +import scala.util.control.NoStackTrace import akka.stream.testkit.AkkaSpec +import akka.testkit.DefaultTimeout -class FlowFoldSpec extends AkkaSpec { +class FlowFoldSpec extends AkkaSpec with DefaultTimeout { implicit val mat = FlowMaterializer() - import system.dispatcher "A Fold" must { @@ -19,7 +19,15 @@ class FlowFoldSpec extends AkkaSpec { val mf = FlowFrom(input).withSink(foldSink).run() val future = foldSink.future(mf) val expected = input.fold(0)(_ + _) - Await.result(future, 5.seconds) should be(expected) + Await.result(future, timeout.duration) should be(expected) + } + + "propagate an error" in { + val error = new Exception with NoStackTrace + val foldSink = FoldSink[Unit, Unit](())((_, _) ⇒ ()) + val mf = FlowFrom[Unit](() ⇒ throw error).withSink(foldSink).run() + val future = foldSink.future(mf) + the[Exception] thrownBy Await.result(future, timeout.duration) should be(error) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 4ed115a7b5..e9947493f9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -199,12 +199,12 @@ final case class OnCompleteSink[Out](callback: Try[Unit] ⇒ Unit) extends Simpl override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = FlowFrom(flowPublisher).transform("onCompleteSink", () ⇒ new Transformer[Out, Unit] { override def onNext(in: Out) = Nil - override def onError(e: Throwable) = { - callback(Failure(e)) - throw e - } + override def onError(e: Throwable) = () override def onTermination(e: Option[Throwable]) = { - callback(OnCompleteSink.SuccessUnit) + e match { + case None ⇒ callback(OnCompleteSink.SuccessUnit) + case Some(e) ⇒ callback(Failure(e)) + } Nil } }).consume()(materializer.withNamePrefix(flowName)) @@ -248,6 +248,7 @@ final case class FoldSink[U, Out](zero: U)(f: (U, Out) ⇒ U) extends SinkWithKe FlowFrom(flowPublisher).transform("fold", () ⇒ new Transformer[Out, U] { var state: U = zero override def onNext(in: Out): immutable.Seq[U] = { state = f(state, in); Nil } + override def onError(cause: Throwable) = () override def onTermination(e: Option[Throwable]) = { e match { case None ⇒ promise.success(state)