diff --git a/akka-docs-dev/rst/java/stream-error.rst b/akka-docs-dev/rst/java/stream-error.rst new file mode 100644 index 0000000000..68e86a0e14 --- /dev/null +++ b/akka-docs-dev/rst/java/stream-error.rst @@ -0,0 +1,71 @@ +.. _stream-error-java: + +############## +Error Handling +############## + +Strategies for how to handle exceptions from processing stream elements can be defined when +materializing the stream. The error handling strategies are inspired by actor supervision +strategies, but the semantics has been adapted to the domain of stream processing. + +Supervision Strategies +====================== + +There are three ways to handle exceptions from application code: + +* ``Stop`` - The stream is completed with failure. +* ``Resume`` - The element is dropped and the stream continues. +* ``Restart`` - The element is dropped and the stream continues after restarting the stage. + Restarting a stage means that any accumulated state is cleared. This is typically + performed by creating a new instance of the stage. + + +By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with +failure when an exception is thrown. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#stop + +The default supervision strategy for a stream can be defined on the settings of the materializer. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#resume + +Here you can see that all ``ArithmeticException`` will resume the processing, i.e. the +elements that cause the division by zero are effectively dropped. + +Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-java`. + +The supervision strategy can also be defined for a section of flow operators. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#resume-section + +``Restart`` works in a similar way as ``Resume`` with the addition that accumulated state, +if any, of the failing processing stage will be reset. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#restart-section + +Errors from mapAsync +==================== + +Stream supervision can also be applied to the futures of ``mapAsync``. + +Let's say that we use an external service to lookup email addresses and we would like to +discard those that cannot be found. + +We start with the tweet stream of authors: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/IntegrationDocTest.java#tweet-authors + +Assume that we can lookup their email address using: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/IntegrationDocTest.java#email-address-lookup2 + +The ``Future`` is completed with ``Failure`` if the email is not found. + +Transforming the stream of authors to a stream of email addresses by using the ``lookupEmail`` +service can be done with ``mapAsync`` and we use ``Supervision.getResumingDecider`` to drop +unknown email addresses: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/IntegrationDocTest.java#email-addresses-mapAsync-supervision + +If we would not use ``Resume`` the default stopping strategy would complete the stream +with failure on the first ``Future`` that was completed with ``Failure``. diff --git a/akka-docs-dev/rst/java/stream-index.rst b/akka-docs-dev/rst/java/stream-index.rst index 95e9d267fd..99eaf10f37 100644 --- a/akka-docs-dev/rst/java/stream-index.rst +++ b/akka-docs-dev/rst/java/stream-index.rst @@ -14,6 +14,7 @@ Streams stream-rate stream-customize stream-integrations + stream-error stream-io stream-cookbook ../stream-configuration diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala new file mode 100644 index 0000000000..dde59b9d21 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.stream + +import scala.concurrent.Await +import akka.stream.ActorFlowMaterializer +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.Supervision +import akka.stream.scaladsl._ +import akka.stream.testkit.AkkaSpec + +class FlowErrorDocSpec extends AkkaSpec { + + "demonstrate fail stream" in { + //#stop + implicit val mat = ActorFlowMaterializer() + val source = Source(0 to 5).map(100 / _) + val result = source.runWith(Sink.fold(0)(_ + _)) + // division by zero will fail the stream and the + // result here will be a Future completed with Failure(ArithmeticException) + //#stop + + intercept[ArithmeticException] { + Await.result(result, remaining) + } + } + + "demonstrate resume stream" in { + //#resume + val decider: Supervision.Decider = exc => exc match { + case _: ArithmeticException => Supervision.Resume + case _ => Supervision.Stop + } + implicit val mat = ActorFlowMaterializer( + ActorFlowMaterializerSettings(system).withSupervisionStrategy(decider)) + val source = Source(0 to 5).map(100 / _) + val result = source.runWith(Sink.fold(0)(_ + _)) + // the element causing division by zero will be dropped + // result here will be a Future completed with Success(228) + //#resume + + Await.result(result, remaining) should be(228) + } + + "demonstrate resume section" in { + //#resume-section + implicit val mat = ActorFlowMaterializer() + val decider: Supervision.Decider = exc => exc match { + case _: ArithmeticException => Supervision.Resume + case _ => Supervision.Stop + } + val source = Source(0 to 5).section(OperationAttributes.supervisionStrategy(decider)) { + _.filter(100 / _ < 50).map(elem => 100 / (5 - elem)) + } + val result = source.runWith(Sink.fold(0)(_ + _)) + // the elements causing division by zero will be dropped + // result here will be a Future completed with Success(150) + //#resume-section + + Await.result(result, remaining) should be(150) + } + + "demonstrate restart section" in { + //#restart-section + implicit val mat = ActorFlowMaterializer() + val decider: Supervision.Decider = exc => exc match { + case _: IllegalArgumentException => Supervision.Restart + case _ => Supervision.Stop + } + val source = Source(List(1, 3, -1, 5, 7)).section( + OperationAttributes.supervisionStrategy(decider)) { + _.scan(0) { (acc, elem) => + if (elem < 0) throw new IllegalArgumentException("negative not allowed") + else acc + elem + } + } + val result = source.grouped(1000).runWith(Sink.head) + // the negative element cause the scan stage to be restarted, + // i.e. start from 0 again + // result here will be a Future completed with Success(Vector(0, 1, 0, 5, 12)) + //#restart-section + + Await.result(result, remaining) should be(Vector(0, 1, 0, 5, 12)) + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala index 635baf6d12..ceedd77045 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -22,6 +22,7 @@ import akka.stream.scaladsl.OperationAttributes import scala.concurrent.ExecutionContext import akka.stream.ActorFlowMaterializerSettings import java.util.concurrent.atomic.AtomicInteger +import akka.stream.Supervision object IntegrationDocSpec { import TwitterStreamQuickstartDocSpec._ @@ -52,6 +53,13 @@ object IntegrationDocSpec { Future.successful(Some(handle.hashCode.toString)) } + class AddressSystem2 { + //#email-address-lookup2 + def lookupEmail(handle: String): Future[String] = + //#email-address-lookup2 + Future.successful(handle + "@somewhere.com") + } + final case class Email(to: String, title: String, body: String) final case class TextMessage(to: String, body: String) @@ -159,6 +167,22 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { probe.expectMsg("akkateam@somewhere.com") } + "lookup email with mapAsync and supervision" in { + val addressSystem = new AddressSystem2 + val authors: Source[Author] = + tweets.filter(_.hashtags.contains(akka)).map(_.author) + + //#email-addresses-mapAsync-supervision + import OperationAttributes.supervisionStrategy + import Supervision.resumingDecider + + val emailAddresses: Source[String] = + authors.section(supervisionStrategy(resumingDecider)) { + _.mapAsync(author => addressSystem.lookupEmail(author.handle)) + } + //#email-addresses-mapAsync-supervision + } + "calling external service with mapAsyncUnordered" in { val probe = TestProbe() val addressSystem = new AddressSystem diff --git a/akka-docs-dev/rst/scala/stream-error.rst b/akka-docs-dev/rst/scala/stream-error.rst new file mode 100644 index 0000000000..6e77303173 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-error.rst @@ -0,0 +1,71 @@ +.. _stream-error-scala: + +############## +Error Handling +############## + +Strategies for how to handle exceptions from processing stream elements can be defined when +materializing the stream. The error handling strategies are inspired by actor supervision +strategies, but the semantics has been adapted to the domain of stream processing. + +Supervision Strategies +====================== + +There are three ways to handle exceptions from application code: + +* ``Stop`` - The stream is completed with failure. +* ``Resume`` - The element is dropped and the stream continues. +* ``Restart`` - The element is dropped and the stream continues after restarting the stage. + Restarting a stage means that any accumulated state is cleared. This is typically + performed by creating a new instance of the stage. + + +By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with +failure when an exception is thrown. + +.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#stop + +The default supervision strategy for a stream can be defined on the settings of the materializer. + +.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#resume + +Here you can see that all ``ArithmeticException`` will resume the processing, i.e. the +elements that cause the division by zero are effectively dropped. + +Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-scala`. + +The supervision strategy can also be defined for a section of flow operators. + +.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#resume-section + +``Restart`` works in a similar way as ``Resume`` with the addition that accumulated state, +if any, of the failing processing stage will be reset. + +.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#restart-section + +Errors from mapAsync +==================== + +Stream supervision can also be applied to the futures of ``mapAsync``. + +Let's say that we use an external service to lookup email addresses and we would like to +discard those that cannot be found. + +We start with the tweet stream of authors: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#tweet-authors + +Assume that we can lookup their email address using: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-address-lookup2 + +The ``Future`` is completed with ``Failure`` if the email is not found. + +Transforming the stream of authors to a stream of email addresses by using the ``lookupEmail`` +service can be done with ``mapAsync`` and we use ``Supervision.resumingDecider`` to drop +unknown email addresses: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-addresses-mapAsync-supervision + +If we would not use ``Resume`` the default stopping strategy would complete the stream +with failure on the first ``Future`` that was completed with ``Failure``. diff --git a/akka-docs-dev/rst/scala/stream-index.rst b/akka-docs-dev/rst/scala/stream-index.rst index 038f68e7d8..098ec75894 100644 --- a/akka-docs-dev/rst/scala/stream-index.rst +++ b/akka-docs-dev/rst/scala/stream-index.rst @@ -14,6 +14,7 @@ Streams stream-rate stream-customize stream-integrations + stream-error stream-io stream-cookbook ../stream-configuration diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index 2e3850887b..6af7081f02 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -10,8 +10,8 @@ import akka.stream.scaladsl.OperationAttributes._ import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } import org.reactivestreams.{ Publisher, Processor } import akka.stream.impl.fusing.Map - import scala.concurrent.Promise +import akka.stream.Supervision class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { @@ -26,7 +26,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() val (processor, _ns) = materializer.asInstanceOf[ActorFlowMaterializerImpl].processorForNode( - Ast.Fused(List(Map[Int, Int](identity)), name("identity")), flowName, 1) + Ast.Fused(List(Map[Int, Int](identity, Supervision.stoppingDecider)), name("identity")), flowName, 1) processor.asInstanceOf[Processor[Int, Int]] } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 3a22653e75..d0324d6f7b 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -10,6 +10,7 @@ import akka.testkit.TestProbe import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.concurrent.duration.FiniteDuration import akka.actor.DeadLetterSuppression +import scala.util.control.NoStackTrace object StreamTestKit { @@ -176,4 +177,6 @@ object StreamTestKit { def getPublisher: Publisher[I] = this } + + case class TE(message: String) extends RuntimeException(message) with NoStackTrace } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 9ef8d12f5b..9fe4787199 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -3,15 +3,17 @@ */ package akka.stream.impl.fusing -import akka.stream.impl.fusing.Map - import scala.util.control.NoStackTrace +import akka.stream.Supervision class InterpreterSpec extends InterpreterSpecKit { + import Supervision.stoppingDecider + import Supervision.resumingDecider + import Supervision.restartingDecider "Interpreter" must { - "implement map correctly" in new TestSetup(Seq(Map((x: Int) ⇒ x + 1))) { + "implement map correctly" in new TestSetup(Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -31,9 +33,9 @@ class InterpreterSpec extends InterpreterSpecKit { } "implement chain of maps correctly" in new TestSetup(Seq( - Map((x: Int) ⇒ x + 1), - Map((x: Int) ⇒ x * 2), - Map((x: Int) ⇒ x + 1))) { + Map((x: Int) ⇒ x + 1, stoppingDecider), + Map((x: Int) ⇒ x * 2, stoppingDecider), + Map((x: Int) ⇒ x + 1, stoppingDecider))) { lastEvents() should be(Set.empty) @@ -68,7 +70,7 @@ class InterpreterSpec extends InterpreterSpecKit { "implement one-to-many many-to-one chain correctly" in new TestSetup(Seq( Doubler(), - Filter((x: Int) ⇒ x != 0))) { + Filter((x: Int) ⇒ x != 0, stoppingDecider))) { lastEvents() should be(Set.empty) @@ -92,7 +94,7 @@ class InterpreterSpec extends InterpreterSpecKit { } "implement many-to-one one-to-many chain correctly" in new TestSetup(Seq( - Filter((x: Int) ⇒ x != 0), + Filter((x: Int) ⇒ x != 0, stoppingDecider), Doubler())) { lastEvents() should be(Set.empty) @@ -134,9 +136,9 @@ class InterpreterSpec extends InterpreterSpecKit { } "implement take inside a chain" in new TestSetup(Seq( - Filter((x: Int) ⇒ x != 0), + Filter((x: Int) ⇒ x != 0, stoppingDecider), Take(2), - Map((x: Int) ⇒ x + 1))) { + Map((x: Int) ⇒ x + 1, stoppingDecider))) { lastEvents() should be(Set.empty) @@ -156,7 +158,7 @@ class InterpreterSpec extends InterpreterSpecKit { lastEvents() should be(Set(Cancel, OnComplete, OnNext(3))) } - "implement fold" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x))) { + "implement fold" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x, stoppingDecider))) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -175,7 +177,7 @@ class InterpreterSpec extends InterpreterSpecKit { lastEvents() should be(Set(OnNext(3), OnComplete)) } - "implement fold with proper cancel" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x))) { + "implement fold with proper cancel" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x, stoppingDecider))) { lastEvents() should be(Set.empty) @@ -195,7 +197,7 @@ class InterpreterSpec extends InterpreterSpecKit { lastEvents() should be(Set(Cancel)) } - "work if fold completes while not in a push position" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x))) { + "work if fold completes while not in a push position" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x, stoppingDecider))) { lastEvents() should be(Set.empty) @@ -233,7 +235,8 @@ class InterpreterSpec extends InterpreterSpecKit { "implement conflate" in new TestSetup(Seq(Conflate( (in: Int) ⇒ in, - (agg: Int, x: Int) ⇒ agg + x))) { + (agg: Int, x: Int) ⇒ agg + x, + stoppingDecider))) { lastEvents() should be(Set(RequestOne)) @@ -293,10 +296,12 @@ class InterpreterSpec extends InterpreterSpecKit { "work with conflate-conflate" in new TestSetup(Seq( Conflate( (in: Int) ⇒ in, - (agg: Int, x: Int) ⇒ agg + x), + (agg: Int, x: Int) ⇒ agg + x, + stoppingDecider), Conflate( (in: Int) ⇒ in, - (agg: Int, x: Int) ⇒ agg + x))) { + (agg: Int, x: Int) ⇒ agg + x, + stoppingDecider))) { lastEvents() should be(Set(RequestOne)) @@ -366,7 +371,8 @@ class InterpreterSpec extends InterpreterSpecKit { "implement conflate-expand" in new TestSetup(Seq( Conflate( (in: Int) ⇒ in, - (agg: Int, x: Int) ⇒ agg + x), + (agg: Int, x: Int) ⇒ agg + x, + stoppingDecider), Expand( (in: Int) ⇒ in, (agg: Int) ⇒ (agg, agg)))) { @@ -407,7 +413,8 @@ class InterpreterSpec extends InterpreterSpecKit { Doubler(), Conflate( (in: Int) ⇒ in, - (agg: Int, x: Int) ⇒ agg + x))) { + (agg: Int, x: Int) ⇒ agg + x, + stoppingDecider))) { lastEvents() should be(Set(RequestOne)) upstream.onNext(1) @@ -422,11 +429,11 @@ class InterpreterSpec extends InterpreterSpecKit { } "work with jumpback table and completed elements" in new TestSetup(Seq( - Map((x: Int) ⇒ x), - Map((x: Int) ⇒ x), + Map((x: Int) ⇒ x, stoppingDecider), + Map((x: Int) ⇒ x, stoppingDecider), KeepGoing(), - Map((x: Int) ⇒ x), - Map((x: Int) ⇒ x))) { + Map((x: Int) ⇒ x, stoppingDecider), + Map((x: Int) ⇒ x, stoppingDecider))) { lastEvents() should be(Set.empty) @@ -469,56 +476,6 @@ class InterpreterSpec extends InterpreterSpecKit { "implement drop-take" in pending - val TE = new Exception("TEST") with NoStackTrace { - override def toString = "TE" - } - - "handle external failure" in new TestSetup(Seq(Map((x: Int) ⇒ x + 1))) { - lastEvents() should be(Set.empty) - - upstream.onError(TE) - lastEvents() should be(Set(OnError(TE))) - - } - - "handle failure inside op" in new TestSetup(Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x))) { - lastEvents() should be(Set.empty) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - - upstream.onNext(2) - lastEvents() should be(Set(OnNext(2))) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - - upstream.onNext(0) - lastEvents() should be(Set(Cancel, OnError(TE))) - - } - - "handle failure inside op in middle of the chain" in new TestSetup(Seq( - Map((x: Int) ⇒ x + 1), - Map((x: Int) ⇒ if (x == 0) throw TE else x), - Map((x: Int) ⇒ x + 1))) { - - lastEvents() should be(Set.empty) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - - upstream.onNext(2) - lastEvents() should be(Set(OnNext(4))) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - - upstream.onNext(-1) - lastEvents() should be(Set(Cancel, OnError(TE))) - - } - "work with keep-going ops" in pending } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala index fa71665651..c003428f25 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -13,6 +13,7 @@ trait InterpreterSpecKit extends AkkaSpec { case class OnError(cause: Throwable) case class OnNext(elem: Any) case object RequestOne + case object RequestAnother private[akka] case class Doubler[T]() extends PushPullStage[T, T] { var oneMore: Boolean = false @@ -71,7 +72,10 @@ trait InterpreterSpecKit extends AkkaSpec { } override def onPull(ctx: BoundaryContext): Directive = { - lastEvent += RequestOne + if (lastEvent(RequestOne)) + lastEvent += RequestAnother + else + lastEvent += RequestOne ctx.exit() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index 6019aa273d..4a4f427c8f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -3,7 +3,10 @@ */ package akka.stream.impl.fusing +import akka.stream.Supervision + class InterpreterStressSpec extends InterpreterSpecKit { + import Supervision.stoppingDecider val chainLength = 1000 * 1000 val halfLength = chainLength / 2 @@ -11,7 +14,7 @@ class InterpreterStressSpec extends InterpreterSpecKit { "Interpreter" must { - "work with a massive chain of maps" in new TestSetup(Seq.fill(chainLength)(Map((x: Int) ⇒ x + 1))) { + "work with a massive chain of maps" in new TestSetup(Seq.fill(chainLength)(Map((x: Int) ⇒ x + 1, stoppingDecider))) { lastEvents() should be(Set.empty) val tstamp = System.nanoTime() @@ -33,9 +36,9 @@ class InterpreterStressSpec extends InterpreterSpecKit { info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") } - "work with a massive chain of maps with early complete" in new TestSetup(Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1)) ++ + "work with a massive chain of maps with early complete" in new TestSetup(Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1, stoppingDecider)) ++ Seq(Take(repetition / 2)) ++ - Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1))) { + Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1, stoppingDecider))) { lastEvents() should be(Set.empty) val tstamp = System.nanoTime() @@ -92,7 +95,8 @@ class InterpreterStressSpec extends InterpreterSpecKit { "work with a massive chain of conflates by overflowing to the heap" in new TestSetup(Seq.fill(100000)(Conflate( (in: Int) ⇒ in, - (agg: Int, in: Int) ⇒ agg + in)), + (agg: Int, in: Int) ⇒ agg + in, + Supervision.stoppingDecider)), forkLimit = 100, overflowToHeap = true) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala new file mode 100644 index 0000000000..37f76dfa04 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -0,0 +1,523 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import scala.util.control.NoStackTrace +import akka.stream.Supervision +import akka.stream.stage.Context +import akka.stream.stage.Directive +import akka.stream.stage.PushPullStage +import akka.stream.stage.Stage +import akka.stream.stage.TerminationDirective + +object InterpreterSupervisionSpec { + val TE = new Exception("TEST") with NoStackTrace { + override def toString = "TE" + } + + class RestartTestStage extends PushPullStage[Int, Int] { + var sum = 0 + def onPush(elem: Int, ctx: Context[Int]): Directive = { + sum += elem + ctx.push(sum) + } + + override def onPull(ctx: Context[Int]): Directive = { + ctx.pull() + } + + override def decide(t: Throwable): Supervision.Directive = Supervision.Restart + + override def restart(): Stage[Int, Int] = { + sum = 0 + this + } + } + + case class OneToManyTestStage(decider: Supervision.Decider, absorbTermination: Boolean = false) extends PushPullStage[Int, Int] { + var buf: List[Int] = Nil + def onPush(elem: Int, ctx: Context[Int]): Directive = { + buf = List(elem + 1, elem + 2, elem + 3) + ctx.push(elem) + } + + override def onPull(ctx: Context[Int]): Directive = { + if (buf.isEmpty && ctx.isFinishing) + ctx.finish() + else if (buf.isEmpty) + ctx.pull() + else { + val elem = buf.head + buf = buf.tail + if (elem == 3) throw TE + ctx.push(elem) + } + } + + override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = + if (absorbTermination) + ctx.absorbTermination() + else + ctx.finish() + + // note that resume will be turned into failure in the Interpreter if exception is thrown from onPull + override def decide(t: Throwable): Supervision.Directive = decider(t) + + override def restart(): OneToManyTestStage = copy() + } + +} + +class InterpreterSupervisionSpec extends InterpreterSpecKit { + import InterpreterSupervisionSpec._ + import Supervision.stoppingDecider + import Supervision.resumingDecider + import Supervision.restartingDecider + + "Interpreter error handling" must { + + "handle external failure" in new TestSetup(Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) { + lastEvents() should be(Set.empty) + + upstream.onError(TE) + lastEvents() should be(Set(OnError(TE))) + + } + + "emit failure when op throws" in new TestSetup(Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, stoppingDecider))) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(0) // boom + lastEvents() should be(Set(Cancel, OnError(TE))) + } + + "emit failure when op throws in middle of the chain" in new TestSetup(Seq( + Map((x: Int) ⇒ x + 1, stoppingDecider), + Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, stoppingDecider), + Map((x: Int) ⇒ x + 100, stoppingDecider))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(113))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(-1) // boom + lastEvents() should be(Set(Cancel, OnError(TE))) + } + + "resume when Map throws" in new TestSetup(Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, resumingDecider))) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(0) // boom + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(3) + lastEvents() should be(Set(OnNext(3))) + } + + "resume when Map throws in middle of the chain" in new TestSetup(Seq( + Map((x: Int) ⇒ x + 1, resumingDecider), + Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, resumingDecider), + Map((x: Int) ⇒ x + 100, resumingDecider))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(113))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(-1) // boom + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(3) + lastEvents() should be(Set(OnNext(114))) + } + + "resume when Map throws before Grouped" in new TestSetup(Seq( + Map((x: Int) ⇒ x + 1, resumingDecider), + Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider), + Grouped(3))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(-1) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(3) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(4) + lastEvents() should be(Set(OnNext(Vector(13, 14, 15)))) + } + + "complete after resume when Map throws before Grouped" in new TestSetup(Seq( + Map((x: Int) ⇒ x + 1, resumingDecider), + Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider), + Grouped(1000))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(-1) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(3) + lastEvents() should be(Set(RequestOne)) + + upstream.onComplete() + lastEvents() should be(Set(OnNext(Vector(13, 14)), OnComplete)) + } + + "restart when onPush throws" in { + val stage = new RestartTestStage { + override def onPush(elem: Int, ctx: Context[Int]): Directive = { + if (elem <= 0) throw TE + else super.onPush(elem, ctx) + } + } + + new TestSetup(Seq( + Map((x: Int) ⇒ x + 1, restartingDecider), + stage, + Map((x: Int) ⇒ x + 100, restartingDecider))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(103))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(-1) // boom + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(3) + lastEvents() should be(Set(OnNext(104))) + } + } + + "restart when onPush throws after ctx.push" in { + val stage = new RestartTestStage { + override def onPush(elem: Int, ctx: Context[Int]): Directive = { + val ret = ctx.push(sum) + super.onPush(elem, ctx) + if (elem <= 0) throw TE + ret + } + } + + new TestSetup(Seq( + Map((x: Int) ⇒ x + 1, restartingDecider), + stage, + Map((x: Int) ⇒ x + 100, restartingDecider))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(103))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(-1) // boom + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(3) + lastEvents() should be(Set(OnNext(104))) + } + } + + "fail when onPull throws" in { + val stage = new RestartTestStage { + override def onPull(ctx: Context[Int]): Directive = { + if (sum < 0) throw TE + super.onPull(ctx) + } + } + + new TestSetup(Seq( + Map((x: Int) ⇒ x + 1, restartingDecider), + stage, + Map((x: Int) ⇒ x + 100, restartingDecider))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(103))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(-5) // this will trigger failure of next requestOne (pull) + lastEvents() should be(Set(OnNext(99))) + + downstream.requestOne() // boom + lastEvents() should be(Set(OnError(TE), Cancel)) + } + } + + "resume when Filter throws" in new TestSetup(Seq( + Filter((x: Int) ⇒ if (x == 0) throw TE else true, resumingDecider))) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(0) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(3) + lastEvents() should be(Set(OnNext(3))) + } + + "resume when MapConcat throws" in new TestSetup(Seq( + MapConcat((x: Int) ⇒ if (x == 0) throw TE else List(x, -x), resumingDecider))) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(1) + lastEvents() should be(Set(OnNext(1))) + downstream.requestOne() + lastEvents() should be(Set(OnNext(-1))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(0) // boom + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(2))) + downstream.requestOne() + lastEvents() should be(Set(OnNext(-2))) + } + + "restart when Collect throws" in { + // TODO can't get type inference to work with `pf` inlined + val pf: PartialFunction[Int, Int] = + { case x: Int ⇒ if (x == 0) throw TE else x } + new TestSetup(Seq( + Collect(restartingDecider)(pf))) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(0) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(3) + lastEvents() should be(Set(OnNext(3))) + } + } + + "resume when Scan throws" in new TestSetup(Seq( + Scan(1, (acc: Int, x: Int) ⇒ if (x == 10) throw TE else acc + x, resumingDecider))) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(10) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(4) + lastEvents() should be(Set(OnNext(3))) // 1 + 2 + } + + "restart when Scan throws" in new TestSetup(Seq( + Scan(1, (acc: Int, x: Int) ⇒ if (x == 10) throw TE else acc + x, restartingDecider))) { + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(2) + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(10) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(4) + lastEvents() should be(Set(OnNext(1))) // starts over again + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(20) + lastEvents() should be(Set(OnNext(5))) // 1+4 + } + + "restart when Conflate `seed` throws" in new TestSetup(Seq(Conflate( + (in: Int) ⇒ if (in == 1) throw TE else in, + (agg: Int, x: Int) ⇒ agg + x, + restartingDecider))) { + + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set.empty) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(0), RequestOne)) + + upstream.onNext(1) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(10) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(12))) // note that 1 has been discarded + + downstream.requestOne() + lastEvents() should be(Set.empty) + } + + "restart when Conflate `aggregate` throws" in new TestSetup(Seq(Conflate( + (in: Int) ⇒ in, + (agg: Int, x: Int) ⇒ if (x == 2) throw TE else agg + x, + restartingDecider))) { + + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set.empty) + + upstream.onNext(0) + lastEvents() should be(Set(OnNext(0), RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) // boom + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(10) + lastEvents() should be(Set(RequestOne)) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(10))) // note that 1 and 2 has been discarded + + downstream.requestOne() + lastEvents() should be(Set.empty) + + upstream.onNext(4) + lastEvents() should be(Set(OnNext(4), RequestOne)) + + downstream.cancel() + lastEvents() should be(Set(Cancel)) + } + + "fail when Expand `seed` throws" in new TestSetup(Seq(Expand( + (in: Int) ⇒ if (in == 2) throw TE else in, + (agg: Int) ⇒ (agg, -math.abs(agg))))) { + + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne, OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(-1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(-1))) + + upstream.onNext(2) // boom + lastEvents() should be(Set(OnError(TE), Cancel)) + } + + "fail when Expand `extrapolate` throws" in new TestSetup(Seq(Expand( + (in: Int) ⇒ in, + (agg: Int) ⇒ if (agg == 2) throw TE else (agg, -math.abs(agg))))) { + + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne, OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(-1))) + + upstream.onNext(2) // boom + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(OnError(TE), Cancel)) + } + + "fail when onPull throws before pushing all generated elements" in { + def test(decider: Supervision.Decider, absorbTermination: Boolean): Unit = { + new TestSetup(Seq( + OneToManyTestStage(decider, absorbTermination))) { + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + upstream.onNext(1) + lastEvents() should be(Set(OnNext(1))) + + if (absorbTermination) { + upstream.onComplete() + lastEvents() should be(Set.empty) + } + + downstream.requestOne() + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + // 3 => boom + if (absorbTermination) + lastEvents() should be(Set(OnError(TE))) + else + lastEvents() should be(Set(OnError(TE), Cancel)) + } + } + + test(resumingDecider, absorbTermination = false) + test(restartingDecider, absorbTermination = false) + test(resumingDecider, absorbTermination = true) + test(restartingDecider, absorbTermination = true) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala index 78a12884e5..b718320835 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala @@ -7,21 +7,23 @@ import scala.collection.immutable import akka.stream.testkit.AkkaSpec import akka.util.ByteString import akka.stream.stage._ +import akka.stream.Supervision class IteratorInterpreterSpec extends AkkaSpec { + import Supervision.stoppingDecider "IteratorInterpreter" must { "work in the happy case" in { val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq( - Map((x: Int) ⇒ x + 1))).iterator + Map((x: Int) ⇒ x + 1, stoppingDecider))).iterator itr.toSeq should be(2 to 11) } "hasNext should not affect elements" in { val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq( - Map((x: Int) ⇒ x))).iterator + Map((x: Int) ⇒ x, stoppingDecider))).iterator itr.hasNext should be(true) itr.hasNext should be(true) @@ -40,7 +42,7 @@ class IteratorInterpreterSpec extends AkkaSpec { "throw exceptions on empty iterator" in { val itr = new IteratorInterpreter[Int, Int](List(1).iterator, Seq( - Map((x: Int) ⇒ x))).iterator + Map((x: Int) ⇒ x, stoppingDecider))).iterator itr.next() should be(1) a[NoSuchElementException] should be thrownBy { itr.next() } @@ -78,7 +80,7 @@ class IteratorInterpreterSpec extends AkkaSpec { "work with an empty iterator" in { val itr = new IteratorInterpreter[Int, Int](Iterator.empty, Seq( - Map((x: Int) ⇒ x + 1))).iterator + Map((x: Int) ⇒ x + 1, stoppingDecider))).iterator itr.hasNext should be(false) a[NoSuchElementException] should be thrownBy { itr.next() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 5aa04019cb..16dc7776c2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -4,11 +4,12 @@ package akka.stream.scaladsl import scala.concurrent.duration._ -import scala.util.control.NoStackTrace - import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings +import akka.stream.Supervision.resumingDecider +import akka.stream.scaladsl.OperationAttributes.supervisionStrategy import akka.stream.testkit._ +import akka.stream.testkit.StreamTestKit.TE import org.reactivestreams.Publisher class FlowGroupBySpec extends AkkaSpec { @@ -52,8 +53,6 @@ class FlowGroupBySpec extends AkkaSpec { } - case class TE(message: String) extends RuntimeException(message) with NoStackTrace - "groupBy" must { "work in the happy case" in new SubstreamsSupport(groupCount = 2) { val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) @@ -88,7 +87,6 @@ class FlowGroupBySpec extends AkkaSpec { s1.expectComplete() masterSubscriber.expectComplete() - } "accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) { @@ -189,6 +187,77 @@ class FlowGroupBySpec extends AkkaSpec { subscriber.expectError(e) } + + "fail stream when groupBy function throws" in { + val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe) + .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) + .runWith(Sink.publisher) + val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + + val (_, substream) = subscriber.expectNext() + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + + substreamPuppet.request(1) + substreamPuppet.expectNext(1) + + upstreamSubscription.sendNext(2) + + subscriber.expectError(exc) + substreamPuppet.expectError(exc) + upstreamSubscription.expectCancellation() + } + + "resume stream when groupBy function throws" in { + val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe).section(supervisionStrategy(resumingDecider))( + _.groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2)) + .runWith(Sink.publisher) + val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + + val (_, substream1) = subscriber.expectNext() + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) + substreamPuppet1.request(10) + substreamPuppet1.expectNext(1) + + upstreamSubscription.sendNext(2) + upstreamSubscription.sendNext(4) + + val (_, substream2) = subscriber.expectNext() + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) + substreamPuppet2.request(10) + substreamPuppet2.expectNext(4) // note that 2 was dropped + + upstreamSubscription.sendNext(3) + substreamPuppet1.expectNext(3) + + upstreamSubscription.sendNext(6) + substreamPuppet2.expectNext(6) + + upstreamSubscription.sendComplete() + subscriber.expectComplete() + substreamPuppet1.expectComplete() + substreamPuppet2.expectComplete() + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index dd022e2226..e497700982 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -8,12 +8,13 @@ import scala.concurrent.Future import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NoStackTrace - import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.testkit.TestLatch import akka.testkit.TestProbe +import akka.stream.scaladsl.OperationAttributes.supervisionStrategy +import akka.stream.Supervision.resumingDecider class FlowMapAsyncSpec extends AkkaSpec { @@ -109,5 +110,31 @@ class FlowMapAsyncSpec extends AkkaSpec { latch.countDown() } + "resume after future failure" in { + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(n ⇒ Future { + if (n == 3) throw new RuntimeException("err3") with NoStackTrace + else n + })).to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + for (n ← List(1, 2, 4, 5)) c.expectNext(n) + c.expectComplete() + } + + "resume when mapAsync throws" in { + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(n ⇒ + if (n == 3) throw new RuntimeException("err4") with NoStackTrace + else Future(n))). + to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + for (n ← List(1, 2, 4, 5)) c.expectNext(n) + c.expectComplete() + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index e22dcf807d..85ee41400a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -7,12 +7,15 @@ import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.testkit.TestLatch import akka.testkit.TestProbe +import akka.stream.scaladsl.OperationAttributes.supervisionStrategy +import akka.stream.Supervision.resumingDecider +import akka.stream.testkit.StreamTestKit.OnNext +import akka.stream.testkit.StreamTestKit.OnComplete class FlowMapAsyncUnorderedSpec extends AkkaSpec { @@ -88,7 +91,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val latch = TestLatch(1) val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).mapAsync(n ⇒ + val p = Source(1 to 5).mapAsyncUnordered(n ⇒ if (n == 3) throw new RuntimeException("err2") with NoStackTrace else { Future { @@ -103,5 +106,31 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { latch.countDown() } + "resume after future failure" in { + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(n ⇒ Future { + if (n == 3) throw new RuntimeException("err3") with NoStackTrace + else n + })).to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet + c.probe.receiveN(5).toSet should be(expected) + } + + "resume when mapAsyncUnordered throws" in { + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(n ⇒ + if (n == 3) throw new RuntimeException("err4") with NoStackTrace + else Future(n))). + to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet + c.probe.receiveN(5).toSet should be(expected) + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 445ea59ca8..790ecd7627 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -58,18 +58,18 @@ object FlowSpec { override def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): (Processor[In, Out], MaterializedMap) = { val props = op match { - case f: Fused ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)) - case Map(f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage)) - case Filter(p, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage)) - case Drop(n, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage)) - case Take(n, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage)) - case Collect(pf, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage)) - case Scan(z, f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage)) - case Expand(s, f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage)) - case Conflate(s, f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage)) - case Buffer(n, s, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage)) - case MapConcat(f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage)) - case o ⇒ ActorProcessorFactory.props(this, o) + case f: Fused ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)) + case Map(f, att) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Map(f, att.settings(settings).supervisionDecider)), brokenMessage)) + case Filter(p, att) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p, att.settings(settings).supervisionDecider)), brokenMessage)) + case Drop(n, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage)) + case Take(n, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage)) + case Collect(pf, att) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Collect(att.settings(settings).supervisionDecider)(pf)), brokenMessage)) + case Scan(z, f, att) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f, att.settings(settings).supervisionDecider)), brokenMessage)) + case Expand(s, f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage)) + case Conflate(s, f, att) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f, att.settings(settings).supervisionDecider)), brokenMessage)) + case Buffer(n, s, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage)) + case MapConcat(f, att) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f, att.settings(settings).supervisionDecider)), brokenMessage)) + case o ⇒ ActorProcessorFactory.props(this, o) } val impl = actorOf(props.withDispatcher(settings.dispatcher), s"$flowName-$n-${op.attributes.name}") (ActorProcessorFactory(impl), MaterializedMap.empty) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 009847027c..bbc2f0b7a4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -4,11 +4,13 @@ package akka.stream.scaladsl import scala.concurrent.duration._ - import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings +import akka.stream.Supervision.resumingDecider +import akka.stream.scaladsl.OperationAttributes.supervisionStrategy import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.TE import org.reactivestreams.Publisher class FlowSplitWhenSpec extends AkkaSpec { @@ -27,6 +29,7 @@ class FlowSplitWhenSpec extends AkkaSpec { def expectNext(elem: Int): Unit = probe.expectNext(elem) def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) def expectComplete(): Unit = probe.expectComplete() + def expectError(e: Throwable) = probe.expectError(e) def cancel(): Unit = subscription.cancel() } @@ -105,6 +108,82 @@ class FlowSplitWhenSpec extends AkkaSpec { s1.expectComplete() } + "fail stream when splitWhen function throws" in { + val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe) + .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) + .runWith(Sink.publisher) + val subscriber = StreamTestKit.SubscriberProbe[Source[Int]]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + + val substream = subscriber.expectNext() + val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher)) + + substreamPuppet.request(10) + substreamPuppet.expectNext(1) + + upstreamSubscription.sendNext(2) + substreamPuppet.expectNext(2) + + upstreamSubscription.sendNext(3) + + subscriber.expectError(exc) + substreamPuppet.expectError(exc) + upstreamSubscription.expectCancellation() + } + + "resume stream when splitWhen function throws" in { + val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() + val exc = TE("test") + val publisher = Source(publisherProbeProbe).section(supervisionStrategy(resumingDecider))( + _.splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0)) + .runWith(Sink.publisher) + val subscriber = StreamTestKit.SubscriberProbe[Source[Int]]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbeProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + + val substream1 = subscriber.expectNext() + val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher)) + + substreamPuppet1.request(10) + substreamPuppet1.expectNext(1) + + upstreamSubscription.sendNext(2) + substreamPuppet1.expectNext(2) + + upstreamSubscription.sendNext(3) + upstreamSubscription.sendNext(4) + substreamPuppet1.expectNext(4) // note that 3 was dropped + + upstreamSubscription.sendNext(5) + substreamPuppet1.expectNext(5) + + upstreamSubscription.sendNext(6) + substreamPuppet1.expectComplete() + val substream2 = subscriber.expectNext() + val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher)) + substreamPuppet2.request(10) + substreamPuppet2.expectNext(6) + + upstreamSubscription.sendComplete() + subscriber.expectComplete() + substreamPuppet2.expectComplete() + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala new file mode 100644 index 0000000000..0a3d9cdcf6 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import scala.collection.immutable +import scala.concurrent.duration._ +import akka.stream.ActorFlowMaterializer +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.testkit.AkkaSpec +import scala.util.control.NoStackTrace +import scala.concurrent.Await +import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.Supervision + +class FlowSupervisionSpec extends AkkaSpec { + import OperationAttributes.supervisionStrategy + + implicit val materializer = ActorFlowMaterializer()(system) + + val exc = new RuntimeException("simulated exc") with NoStackTrace + + val failingMap = (s: Source[Int]) ⇒ s.map(n ⇒ if (n == 3) throw exc else n) + + // FIXME this would be more elegant with Flow[Int, Int] and `via`, but `via` is currently not propagating the OperationAttributes + def run(s: Source[Int] ⇒ Source[Int]): immutable.Seq[Int] = + Await.result(s(Source(1 to 5)).grouped(1000).runWith(Sink.head), 3.seconds) + + "Stream supervision" must { + + "stop and complete stream with failure by default" in { + intercept[RuntimeException] { + run(failingMap) + } should be(exc) + } + + "support resume " in { + val result = run(s ⇒ s.section(supervisionStrategy(Supervision.resumingDecider))( + failingMap(_))) + result should be(List(1, 2, 4, 5)) + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 8386af3497..ca1fa4e366 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -18,6 +18,7 @@ import org.reactivestreams.Subscriber import scala.concurrent.duration._ import akka.actor.Props import akka.actor.ActorRef +import akka.stream.javadsl.japi object ActorFlowMaterializer { @@ -196,6 +197,7 @@ object ActorFlowMaterializerSettings { initialInputBufferSize = config.getInt("initial-input-buffer-size"), maxInputBufferSize = config.getInt("max-input-buffer-size"), dispatcher = config.getString("dispatcher"), + supervisionDecider = Supervision.stoppingDecider, subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config), fileIODispatcher = config.getString("file-io-dispatcher"), debugLogging = config.getBoolean("debug-logging"), @@ -230,6 +232,7 @@ final case class ActorFlowMaterializerSettings( initialInputBufferSize: Int, maxInputBufferSize: Int, dispatcher: String, + supervisionDecider: Supervision.Decider, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, fileIODispatcher: String, // FIXME Why does this exist?! debugLogging: Boolean, @@ -247,6 +250,22 @@ final case class ActorFlowMaterializerSettings( def withDispatcher(dispatcher: String): ActorFlowMaterializerSettings = copy(dispatcher = dispatcher) + /** + * Scala API: Decides how exceptions from application code are to be handled, unless + * overridden for specific sections of the stream operations with + * [[akka.stream.scaladsl.OperationAttributes#supervisionStrategy]]. + */ + def withSupervisionStrategy(decider: Supervision.Decider): ActorFlowMaterializerSettings = + copy(supervisionDecider = decider) + + /** + * Java API: Decides how exceptions from application code are to be handled, unless + * overridden for specific sections of the stream operations with + * [[akka.stream.javadsl.OperationAttributes#supervisionStrategy]]. + */ + def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = + copy(supervisionDecider = e ⇒ decider.apply(e)) + def withDebugLogging(enable: Boolean): ActorFlowMaterializerSettings = copy(debugLogging = enable) diff --git a/akka-stream/src/main/scala/akka/stream/Supervision.scala b/akka-stream/src/main/scala/akka/stream/Supervision.scala new file mode 100644 index 0000000000..2957f8fe44 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/Supervision.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream + +import scala.util.control.NonFatal +import akka.stream.javadsl.japi + +object Supervision { + sealed trait Directive + + /** + * The stream will be completed with failure if application code for processing an element + * throws an exception. + */ + case object Stop extends Directive + + /** + * Java API: The stream will be completed with failure if application code for processing an element + * throws an exception. + */ + def stop = Stop + + /** + * The element is dropped and the stream continues if application code for processing + * an element throws an exception. + */ + case object Resume extends Directive + + /** + * Java API: The element is dropped and the stream continues if application code for processing + * an element throws an exception. + */ + def resume = Resume + + /** + * The element is dropped and the stream continues after restarting the stage + * if application code for processing an element throws an exception. + * Restarting a stage means that any accumulated state is cleared. This is typically + * performed by creating a new instance of the stage. + */ + case object Restart extends Directive + + /** + * Java API: The element is dropped and the stream continues after restarting the stage + * if application code for processing an element throws an exception. + * Restarting a stage means that any accumulated state is cleared. This is typically + * performed by creating a new instance of the stage. + */ + def restart = Restart + + type Decider = Function[Throwable, Directive] + + /** + * Scala API: [[Decider]] that returns [[Stop]] for all exceptions. + */ + val stoppingDecider: Decider = { + case NonFatal(_) ⇒ Stop + } + + /** + * Java API: Decider function that returns [[#stop]] for all exceptions. + */ + val getStoppingDecider: japi.Function[Throwable, Directive] = + new japi.Function[Throwable, Directive] { + override def apply(e: Throwable): Directive = stoppingDecider(e) + } + + /** + * Scala API: [[Decider]] that returns [[Resume]] for all exceptions. + */ + val resumingDecider: Decider = { + case NonFatal(_) ⇒ Resume + } + + /** + * Java API: Decider function that returns [[#resume]] for all exceptions. + */ + val getResumingDecider: japi.Function[Throwable, Directive] = + new japi.Function[Throwable, Directive] { + override def apply(e: Throwable): Directive = resumingDecider(e) + } + + /** + * Scala API: [[Decider]] that returns [[Restart]] for all exceptions. + */ + val restartingDecider: Decider = { + case NonFatal(_) ⇒ Restart + } + + /** + * Java API: Decider function that returns [[#restart]] for all exceptions. + */ + val getRestartingDecider: japi.Function[Throwable, Directive] = + new japi.Function[Throwable, Directive] { + override def apply(e: Throwable): Directive = restartingDecider(e) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 55a64b73cf..3cb0df183a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -317,16 +317,16 @@ case class ActorFlowMaterializerImpl( // Optimizations below case noMatch if !optimizations.fusion ⇒ prev - case Ast.Map(f, _) ⇒ fusing.Map(f) :: prev - case Ast.Filter(p, _) ⇒ fusing.Filter(p) :: prev + case Ast.Map(f, att) ⇒ fusing.Map(f, att.settings(settings).supervisionDecider) :: prev + case Ast.Filter(p, att) ⇒ fusing.Filter(p, att.settings(settings).supervisionDecider) :: prev case Ast.Drop(n, _) ⇒ fusing.Drop(n) :: prev case Ast.Take(n, _) ⇒ fusing.Take(n) :: prev - case Ast.Collect(pf, _) ⇒ fusing.Collect(pf) :: prev - case Ast.Scan(z, f, _) ⇒ fusing.Scan(z, f) :: prev + case Ast.Collect(pf, att) ⇒ fusing.Collect(att.settings(settings).supervisionDecider)(pf) :: prev + case Ast.Scan(z, f, att) ⇒ fusing.Scan(z, f, att.settings(settings).supervisionDecider) :: prev case Ast.Expand(s, f, _) ⇒ fusing.Expand(s, f) :: prev - case Ast.Conflate(s, f, _) ⇒ fusing.Conflate(s, f) :: prev + case Ast.Conflate(s, f, att) ⇒ fusing.Conflate(s, f, att.settings(settings).supervisionDecider) :: prev case Ast.Buffer(n, s, _) ⇒ fusing.Buffer(n, s) :: prev - case Ast.MapConcat(f, _) ⇒ fusing.MapConcat(f) :: prev + case Ast.MapConcat(f, att) ⇒ fusing.MapConcat(f, att.settings(settings).supervisionDecider) :: prev case Ast.Grouped(n, _) ⇒ fusing.Grouped(n) :: prev //FIXME Add more fusion goodies here case _ ⇒ prev @@ -555,23 +555,33 @@ private[akka] object ActorProcessorFactory { def props(materializer: ActorFlowMaterializer, op: AstNode): Props = { val settings = materializer.settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW op match { - case Fused(ops, _) ⇒ ActorInterpreter.props(settings, ops) - case Map(f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Map(f))) - case Filter(p, _) ⇒ ActorInterpreter.props(settings, List(fusing.Filter(p))) - case Drop(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Drop(n))) - case Take(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Take(n))) - case Collect(pf, _) ⇒ ActorInterpreter.props(settings, List(fusing.Collect(pf))) - case Scan(z, f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Scan(z, f))) - case Expand(s, f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Expand(s, f))) - case Conflate(s, f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Conflate(s, f))) - case Buffer(n, s, _) ⇒ ActorInterpreter.props(settings, List(fusing.Buffer(n, s))) - case MapConcat(f, _) ⇒ ActorInterpreter.props(settings, List(fusing.MapConcat(f))) - case MapAsync(f, _) ⇒ MapAsyncProcessorImpl.props(settings, f) - case MapAsyncUnordered(f, _) ⇒ MapAsyncUnorderedProcessorImpl.props(settings, f) - case Grouped(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Grouped(n))) - case GroupBy(f, _) ⇒ GroupByProcessorImpl.props(settings, f) - case PrefixAndTail(n, _) ⇒ PrefixAndTailImpl.props(settings, n) - case SplitWhen(p, _) ⇒ SplitWhenProcessorImpl.props(settings, p) + case Fused(ops, _) ⇒ ActorInterpreter.props(settings, ops) + // FIXME this way of grabbing the supervisionDecider feels very inefficient + case Map(f, att) ⇒ + ActorInterpreter.props(settings, List(fusing.Map(f, att.settings(settings).supervisionDecider))) + case Filter(p, att) ⇒ + ActorInterpreter.props(settings, List(fusing.Filter(p, att.settings(settings).supervisionDecider))) + case Drop(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Drop(n))) + case Take(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Take(n))) + case Collect(pf, att) ⇒ + ActorInterpreter.props(settings, List(fusing.Collect(att.settings(settings).supervisionDecider)(pf))) + case Scan(z, f, att) ⇒ + ActorInterpreter.props(settings, List(fusing.Scan(z, f, att.settings(settings).supervisionDecider))) + case Expand(s, f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Expand(s, f))) + case Conflate(s, f, att) ⇒ + ActorInterpreter.props(settings, List(fusing.Conflate(s, f, att.settings(settings).supervisionDecider))) + case Buffer(n, s, _) ⇒ ActorInterpreter.props(settings, List(fusing.Buffer(n, s))) + case MapConcat(f, att) ⇒ + ActorInterpreter.props(settings, List(fusing.MapConcat(f, att.settings(settings).supervisionDecider))) + case MapAsync(f, att) ⇒ MapAsyncProcessorImpl.props(att.settings(settings), f) + case MapAsyncUnordered(f, att) ⇒ MapAsyncUnorderedProcessorImpl.props(att.settings(settings), f) + // FIXME always amend settings with att.settings(settings) + case Grouped(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Grouped(n))) + case GroupBy(f, att) ⇒ + GroupByProcessorImpl.props(att.settings(settings), f) + case PrefixAndTail(n, _) ⇒ PrefixAndTailImpl.props(settings, n) + case SplitWhen(p, att) ⇒ + SplitWhenProcessorImpl.props(att.settings(settings), p) case ConcatAll(_) ⇒ ConcatAllImpl.props(materializer) //FIXME closes over the materializer, is this good? case StageFactory(mkStage, _) ⇒ ActorInterpreter.props(settings, List(mkStage())) case TimerTransform(mkStage, _) ⇒ TimerTransformerProcessorsImpl.props(settings, mkStage()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index b3f1ad0663..1a44b1d8f5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -3,9 +3,11 @@ */ package akka.stream.impl -import akka.stream.ActorFlowMaterializerSettings -import akka.stream.scaladsl.Source +import scala.util.control.NonFatal import akka.actor.Props +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.Supervision +import akka.stream.scaladsl.Source /** * INTERNAL API @@ -13,6 +15,8 @@ import akka.actor.Props private[akka] object GroupByProcessorImpl { def props(settings: ActorFlowMaterializerSettings, keyFor: Any ⇒ Any): Props = Props(new GroupByProcessorImpl(settings, keyFor)) + + private case object Drop } /** @@ -22,7 +26,9 @@ private[akka] class GroupByProcessorImpl(settings: ActorFlowMaterializerSettings extends MultiStreamOutputProcessor(settings) { import MultiStreamOutputProcessor._ + import GroupByProcessorImpl.Drop + val decider = settings.supervisionDecider var keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput] var pendingSubstreamOutput: SubstreamOutput = _ @@ -30,22 +36,34 @@ private[akka] class GroupByProcessorImpl(settings: ActorFlowMaterializerSettings // No substream is open yet. If downstream cancels now, we are complete val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - nextPhase(openSubstream(elem, key)) + tryKeyFor(elem) match { + case Drop ⇒ + case key ⇒ nextPhase(openSubstream(elem, key)) + } } // some substreams are open now. If downstream cancels, we still continue until the substreams are closed val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - - keyToSubstreamOutput.get(key) match { - case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key))) - case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) - case _ ⇒ // stay + tryKeyFor(elem) match { + case Drop ⇒ + case key ⇒ + keyToSubstreamOutput.get(key) match { + case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key))) + case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) + case _ ⇒ // stay + } } } + private def tryKeyFor(elem: Any): Any = + try keyFor(elem) catch { + case NonFatal(e) if decider(e) != Supervision.Stop ⇒ + if (settings.debugLogging) + log.debug("Dropped element [{}] due to exception from groupBy function: {}", elem, e.getMessage) + Drop + } + def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemandOrCancel) { () ⇒ if (primaryOutputs.isClosed) { // Just drop, we do not open any more substreams diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala index f85fda2674..c75f07e304 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala @@ -13,6 +13,7 @@ import akka.pattern.pipe import scala.annotation.tailrec import akka.actor.Props import akka.actor.DeadLetterSuppression +import akka.stream.Supervision /** * INTERNAL API @@ -32,6 +33,7 @@ private[akka] object MapAsyncProcessorImpl { final case class FutureElement(seqNo: Long, element: Any) extends DeadLetterSuppression final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression + final case class RecoveredError(in: Any, cause: Throwable) } /** @@ -44,6 +46,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin // Execution context for pipeTo and friends import context.dispatcher + val decider = settings.supervisionDecider var submittedSeqNo = 0L var doneSeqNo = 0L def gap: Long = submittedSeqNo - doneSeqNo @@ -54,7 +57,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin // keep future results arriving too early in a buffer sorted by seqNo var orderedBuffer = TreeSet.empty[FutureElement] - override def activeReceive = futureReceive orElse super.activeReceive + override def activeReceive = futureReceive.orElse[Any, Unit](super.activeReceive) def drainBuffer(): List[Any] = { @@ -94,10 +97,10 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin if (!primaryOutputs.demandAvailable) throw new IllegalStateException if (orderedBuffer.isEmpty) { - primaryOutputs.enqueueOutputElement(element) + emit(element) } else { - primaryOutputs.enqueueOutputElement(element) - drainBuffer() foreach primaryOutputs.enqueueOutputElement + emit(element) + drainBuffer() foreach emit } pump() } else { @@ -110,6 +113,14 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin fail(cause) } + def emit(element: Any): Unit = element match { + case RecoveredError(in, err) ⇒ + if (settings.debugLogging) + log.debug("Dropped element [{}] due to mapAsync future was completed with exception: {}", in, err.getMessage) + case elem ⇒ + primaryOutputs.enqueueOutputElement(element) + } + override def onError(e: Throwable): Unit = { // propagate upstream failure immediately fail(e) @@ -126,16 +137,27 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin nextPhase(completedPhase) } else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) { val elem = primaryInputs.dequeueInputElement() - submittedSeqNo += 1 - val seqNo = submittedSeqNo try { - f(elem).map(FutureElement(seqNo, _)).recover { + val future = f(elem) + submittedSeqNo += 1 + val seqNo = submittedSeqNo + future.map(FutureElement(seqNo, _)).recover { + case err: Throwable if decider(err) != Supervision.Stop ⇒ + FutureElement(seqNo, RecoveredError(elem, err)) case err ⇒ FutureFailure(err) }.pipeTo(self) } catch { case NonFatal(err) ⇒ - // f threw, propagate failure immediately - fail(err) + // f threw, handle failure immediately + decider(err) match { + case Supervision.Stop ⇒ + fail(err) + case Supervision.Resume | Supervision.Restart ⇒ + // submittedSeqNo was not increased, just continue + if (settings.debugLogging) + log.debug("Dropped element [{}] due to exception from mapAsync factory function: {}", elem, err.getMessage) + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala index b0a56be874..c7016485c1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala @@ -10,6 +10,7 @@ import akka.stream.ActorFlowMaterializerSettings import akka.pattern.pipe import akka.actor.Props import akka.actor.DeadLetterSuppression +import akka.stream.Supervision /** * INTERNAL API @@ -19,7 +20,7 @@ private[akka] object MapAsyncUnorderedProcessorImpl { Props(new MapAsyncUnorderedProcessorImpl(settings, f)) final case class FutureElement(element: Any) extends DeadLetterSuppression - final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression + final case class FutureFailure(in: Any, cause: Throwable) extends DeadLetterSuppression } /** @@ -32,9 +33,10 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali // Execution context for pipeTo and friends import context.dispatcher + val decider = settings.supervisionDecider var inProgressCount = 0 - override def activeReceive = futureReceive orElse super.activeReceive + override def activeReceive = futureReceive.orElse[Any, Unit](super.activeReceive) def futureReceive: Receive = { case FutureElement(element) ⇒ @@ -46,8 +48,17 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali primaryOutputs.enqueueOutputElement(element) pump() - case FutureFailure(cause) ⇒ - fail(cause) + case FutureFailure(in, err) ⇒ + decider(err) match { + case Supervision.Stop ⇒ + fail(err) + case Supervision.Resume | Supervision.Restart ⇒ + inProgressCount -= 1 + if (settings.debugLogging) + log.debug("Dropped element [{}] due to mapAsyncUnordered future was completed with exception: {}", + in, err.getMessage) + pump() + } } override def onError(e: Throwable): Unit = { @@ -66,15 +77,23 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali nextPhase(completedPhase) } else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - inProgressCount > 0) { val elem = primaryInputs.dequeueInputElement() - inProgressCount += 1 try { - f(elem).map(FutureElement.apply).recover { - case err ⇒ FutureFailure(err) + val future = f(elem) + inProgressCount += 1 + future.map(FutureElement.apply).recover { + case err ⇒ FutureFailure(elem, err) }.pipeTo(self) } catch { case NonFatal(err) ⇒ // f threw, propagate failure immediately - fail(err) + decider(err) match { + case Supervision.Stop ⇒ + fail(err) + case Supervision.Resume | Supervision.Restart ⇒ + // inProgressCount was not increased, just continue + if (settings.debugLogging) + log.debug("Dropped element [{}] due to exception from mapAsyncUnordered factory function: {}", elem, err.getMessage) + } } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index aee6bdcbcb..9be72dc1e7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -3,9 +3,11 @@ */ package akka.stream.impl -import akka.stream.ActorFlowMaterializerSettings -import akka.stream.scaladsl.Source +import scala.util.control.NonFatal import akka.actor.Props +import akka.stream.ActorFlowMaterializerSettings +import akka.stream.Supervision +import akka.stream.scaladsl.Source /** * INTERNAL API @@ -13,6 +15,11 @@ import akka.actor.Props private[akka] object SplitWhenProcessorImpl { def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any ⇒ Boolean): Props = Props(new SplitWhenProcessorImpl(settings, splitPredicate)) + + private trait SplitDecision + private case object Split extends SplitDecision + private case object Continue extends SplitDecision + private case object Drop extends SplitDecision } /** @@ -22,7 +29,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSetti extends MultiStreamOutputProcessor(_settings) { import MultiStreamOutputProcessor._ + import SplitWhenProcessorImpl._ + val decider = settings.supervisionDecider var currentSubstream: SubstreamOutput = _ val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ @@ -46,19 +55,33 @@ private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSetti // Note that this phase is allocated only once per _slice_ and not per element def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ val elem = primaryInputs.dequeueInputElement() - if (splitPredicate(elem)) { - completeSubstreamOutput(currentSubstream.key) - currentSubstream = null - nextPhase(openSubstream(elem)) - } else substream.enqueueOutputElement(elem) + decideSplit(elem) match { + case Continue ⇒ substream.enqueueOutputElement(elem) + case Split ⇒ + completeSubstreamOutput(currentSubstream.key) + currentSubstream = null + nextPhase(openSubstream(elem)) + case Drop ⇒ // drop elem and continue + } } // Ignore elements for a cancelled substream until a new substream needs to be opened val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { () ⇒ val elem = primaryInputs.dequeueInputElement() - if (splitPredicate(elem)) nextPhase(openSubstream(elem)) + decideSplit(elem) match { + case Continue | Drop ⇒ // ignore elem + case Split ⇒ nextPhase(openSubstream(elem)) + } } + private def decideSplit(elem: Any): SplitDecision = + try if (splitPredicate(elem)) Split else Continue catch { + case NonFatal(e) if decider(e) != Supervision.Stop ⇒ + if (settings.debugLogging) + log.debug("Dropped element [{}] due to exception from splitWhen function: {}", elem, e.getMessage) + Drop + } + nextPhase(waitFirst) override def completeSubstreamOutput(substream: SubstreamKey): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 9159cf24c2..6c635a7d45 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -7,6 +7,7 @@ import scala.annotation.tailrec import scala.collection.breakOut import scala.util.control.NonFatal import akka.stream.stage._ +import akka.stream.Supervision // TODO: // fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions) @@ -30,6 +31,11 @@ import akka.stream.stage._ private[akka] abstract class BoundaryStage extends AbstractStage[Any, Any, Directive, Directive, BoundaryContext] { private[fusing] var bctx: BoundaryContext = _ def enter(): BoundaryContext = bctx + + final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop + + final override def restart(): BoundaryStage = + throw new UnsupportedOperationException("BoundaryStage doesn't support restart") } /** @@ -318,6 +324,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: currentOp.allowedToPush = true super.hold() } + } private final val Completing: State = new State { @@ -396,7 +403,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: ("----" * (activeOpIndex - jumpBacks(activeOpIndex) - 1)) case Completing ⇒ padding + "---|" case Cancelling ⇒ padding + "|---" - case Failing(e) ⇒ padding + s"---X ${e.getMessage}" + case Failing(e) ⇒ padding + s"---X ${e.getMessage} => ${decide(e)}" case other ⇒ padding + s"---? $state" } println(icon) @@ -410,7 +417,13 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: } catch { case NonFatal(e) if lastOpFailing != activeOpIndex ⇒ lastOpFailing = activeOpIndex - state.fail(e) + decide(e) match { + case Supervision.Stop ⇒ state.fail(e) + case Supervision.Resume ⇒ state.pull() + case Supervision.Restart ⇒ + pipeline(activeOpIndex) = pipeline(activeOpIndex).restart().asInstanceOf[UntypedOp] + state.pull() + } } } @@ -425,6 +438,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: } } + def decide(e: Throwable): Supervision.Directive = + if (state == Pulling || state.isHolding) Supervision.Stop + else currentOp.decide(e) + /** * Forks off execution of the pipeline by saving current position, fully executing the effects of the given * forkState then setting back the position to the saved value. diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 70bc750c52..f3495155d1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -7,21 +7,26 @@ import scala.collection.immutable import akka.stream.OverflowStrategy import akka.stream.impl.FixedSizeBuffer import akka.stream.stage._ +import akka.stream.Supervision /** * INTERNAL API */ -private[akka] final case class Map[In, Out](f: In ⇒ Out) extends PushStage[In, Out] { +private[akka] final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] { override def onPush(elem: In, ctx: Context[Out]): Directive = ctx.push(f(elem)) + + override def decide(t: Throwable): Supervision.Directive = decider(t) } /** * INTERNAL API */ -private[akka] final case class Filter[T](p: T ⇒ Boolean) extends PushStage[T, T] { +private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] { override def onPush(elem: T, ctx: Context[T]): Directive = if (p(elem)) ctx.push(elem) else ctx.pull() + + override def decide(t: Throwable): Supervision.Directive = decider(t) } private[akka] final object Collect { @@ -31,19 +36,21 @@ private[akka] final object Collect { final val NotApplied: Any ⇒ Any = _ ⇒ Collect.NotApplied } -private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends PushStage[In, Out] { +private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf: PartialFunction[In, Out]) extends PushStage[In, Out] { import Collect.NotApplied override def onPush(elem: In, ctx: Context[Out]): Directive = pf.applyOrElse(elem, NotApplied) match { case NotApplied ⇒ ctx.pull() case result: Out @unchecked ⇒ ctx.push(result) } + + override def decide(t: Throwable): Supervision.Directive = decider(t) } /** * INTERNAL API */ -private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) extends PushPullStage[In, Out] { +private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] { private var currentIterator: Iterator[Out] = Iterator.empty override def onPush(elem: In, ctx: Context[Out]): Directive = { @@ -59,6 +66,10 @@ private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination() + + override def decide(t: Throwable): Supervision.Directive = decider(t) + + override def restart(): MapConcat[In, Out] = copy() } /** @@ -90,7 +101,7 @@ private[akka] final case class Drop[T](count: Int) extends PushStage[T, T] { /** * INTERNAL API */ -private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends PushPullStage[In, Out] { +private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out, decider: Supervision.Decider) extends PushPullStage[In, Out] { private var aggregator = zero override def onPush(elem: In, ctx: Context[Out]): Directive = { @@ -104,12 +115,16 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex else ctx.pull() override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination() + + override def decide(t: Throwable): Supervision.Directive = decider(t) + + override def restart(): Scan[In, Out] = copy() } /** * INTERNAL API */ -private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends PushPullStage[In, Out] { +private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, decider: Supervision.Decider) extends PushPullStage[In, Out] { private var aggregator = zero override def onPush(elem: In, ctx: Context[Out]): Directive = { @@ -122,6 +137,10 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex else ctx.pull() override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination() + + override def decide(t: Throwable): Supervision.Directive = decider(t) + + override def restart(): Fold[In, Out] = copy() } /** @@ -229,12 +248,14 @@ private[akka] final case class Completed[T]() extends PushPullStage[T, T] { /** * INTERNAL API */ -private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends DetachedStage[In, Out] { +private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out, + decider: Supervision.Decider) extends DetachedStage[In, Out] { private var agg: Any = null override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = { - agg = if (agg == null) seed(elem) - else aggregate(agg.asInstanceOf[Out], elem) + agg = + if (agg == null) seed(elem) + else aggregate(agg.asInstanceOf[Out], elem) if (!ctx.isHolding) ctx.pull() else { @@ -255,12 +276,17 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O } else if (agg == null) ctx.hold() else { val result = agg.asInstanceOf[Out] + if (result == null) throw new NullPointerException agg = null ctx.push(result) } } override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = ctx.absorbTermination() + + override def decide(t: Throwable): Supervision.Directive = decider(t) + + override def restart(): Conflate[In, Out] = copy() } /** @@ -302,4 +328,9 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol if (expanded) ctx.finish() else ctx.absorbTermination() } + + final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop + + final override def restart(): Expand[In, Out, Seed] = + throw new UnsupportedOperationException("Expand doesn't support restart") } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b70884fc8d..1f991476f6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -152,6 +152,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * downstream may run in parallel and may complete in any order, but the elements that * are emitted downstream are in the same order as received from upstream. * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision#stop]] + * the stream will be completed with failure. + * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision#resume]] or + * [[akka.stream.Supervision#restart]] the element is dropped and the stream continues. + * * @see [[#mapAsyncUnordered]] */ def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] = @@ -165,6 +173,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream * in the same order as received from upstream. * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision#stop]] + * the stream will be completed with failure. + * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision#resume]] or + * [[akka.stream.Supervision#restart]] the element is dropped and the stream continues. + * * @see [[#mapAsync]] */ def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] = @@ -198,6 +214,10 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * emits its current value which starts at `zero` and then * applies the current and next value to the given function `f`, * emitting the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision#restart]] current value starts at `zero` again + * the stream will continue. */ def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Flow[In, T] = new Flow(delegate.scan(zero)(f.apply)) @@ -275,6 +295,9 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream * subscriber. * + * Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. + * * @param seed Provides the first state for extrapolation using the first unconsumed element * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. @@ -322,6 +345,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * stop this processor from processing more elements, therefore you must take * care to unblock (or cancel) all of the produced streams even if you want * to consume only one of them. + * + * If the group by function `f` throws an exception and the supervision decision + * is [[akka.stream.Supervision#stop]] the stream and substreams will be completed + * with failure. + * + * If the group by function `f` throws an exception and the supervision decision + * is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]] + * the element is dropped and the stream and substreams continue. */ def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] = new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step @@ -338,6 +369,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * true, false, // elements go into second substream * true, false, false // elements go into third substream * }}} + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision#stop]] the stream and substreams will be completed + * with failure. + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]] + * the element is dropped and the stream and substreams continue. */ def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, Source[Out]] = new Flow(delegate.splitWhen(p.test).map(_.asJava)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala index 007a6aa1c8..24df702dfb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala @@ -4,6 +4,7 @@ package akka.stream.javadsl import akka.stream.scaladsl +import akka.stream.Supervision /** * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] @@ -50,6 +51,14 @@ object OperationAttributes { private[akka] def asScala = scaladsl.OperationAttributes.dispatcher(dispatcher) } + /** + * Decides how exceptions from application code are to be handled. + */ + def supervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): OperationAttributes = + new OperationAttributes { + private[akka] def asScala = scaladsl.OperationAttributes.supervisionStrategy(e ⇒ decider.apply(e)) + } + private[akka] val none: OperationAttributes = new OperationAttributes { private[akka] def asScala = scaladsl.OperationAttributes.none } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1ce84ab054..9b8cc30a42 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -160,6 +160,14 @@ trait FlowOps[+Out] { * downstream may run in parallel and may complete in any order, but the elements that * are emitted downstream are in the same order as received from upstream. * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision.Stop]] + * the stream will be completed with failure. + * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or + * [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues. + * * @see [[#mapAsyncUnordered]] */ def mapAsync[T](f: Out ⇒ Future[T]): Repr[T] = @@ -173,6 +181,14 @@ trait FlowOps[+Out] { * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream * in the same order as received from upstream. * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision.Stop]] + * the stream will be completed with failure. + * + * If the group by function `f` throws an exception or if the `Future` is completed + * with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or + * [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues. + * * @see [[#mapAsync]] */ def mapAsyncUnordered[T](f: Out ⇒ Future[T]): Repr[T] = @@ -203,6 +219,10 @@ trait FlowOps[+Out] { * emits its current value which starts at `zero` and then * applies the current and next value to the given function `f`, * emitting the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. */ def scan[T](zero: T)(f: (T, Out) ⇒ T): Repr[T] = andThen(Scan(zero, f.asInstanceOf[(Any, Any) ⇒ Any])) @@ -325,6 +345,9 @@ trait FlowOps[+Out] { * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream * subscriber. * + * Expand does not support [[akka.stream.Supervision.Restart]] and [[akka.stream.Supervision.Resume]]. + * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. + * * @param seed Provides the first state for extrapolation using the first unconsumed element * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. @@ -369,6 +392,14 @@ trait FlowOps[+Out] { * stop this processor from processing more elements, therefore you must take * care to unblock (or cancel) all of the produced streams even if you want * to consume only one of them. + * + * If the group by function `f` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed + * with failure. + * + * If the group by function `f` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] + * the element is dropped and the stream and substreams continue. */ def groupBy[K, U >: Out](f: Out ⇒ K): Repr[(K, Source[U])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) @@ -385,6 +416,14 @@ trait FlowOps[+Out] { * true, false, // elements go into second substream * true, false, false // elements go into third substream * }}} + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed + * with failure. + * + * If the split predicate `p` throws an exception and the supervision decision + * is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] + * the element is dropped and the stream and substreams continue. */ def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala index efab3f5287..402f185c7b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.ActorFlowMaterializerSettings import akka.stream.impl.Ast.AstNode +import akka.stream.Supervision /** * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] @@ -17,11 +18,10 @@ final case class OperationAttributes private (private val attributes: List[Opera /** * Adds given attributes to the end of these attributes. */ - def and(other: OperationAttributes): OperationAttributes = { - // FIXME should return `this` if other.attributes is empty - // FIXME should return `other` if this is `none` - OperationAttributes(attributes ::: other.attributes) - } + def and(other: OperationAttributes): OperationAttributes = + if (attributes.isEmpty) other + else if (other.attributes.isEmpty) this + else OperationAttributes(attributes ::: other.attributes) private[akka] def nameLifted: Option[String] = attributes.collect { @@ -37,6 +37,8 @@ final case class OperationAttributes private (private val attributes: List[Opera attributes.collect { case InputBuffer(initial, max) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withInputBuffer(initial, max) case Dispatcher(dispatcher) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) ⇒ (s: ActorFlowMaterializerSettings) ⇒ + s.withSupervisionStrategy(decider) }.reduceOption(_ andThen _).getOrElse(identity) // FIXME is this the optimal way of encoding this? private[akka] def transform(node: AstNode): AstNode = @@ -62,6 +64,7 @@ object OperationAttributes { private[OperationAttributes] final case class Name(n: String) extends Attribute private[OperationAttributes] final case class InputBuffer(initial: Int, max: Int) extends Attribute private[OperationAttributes] final case class Dispatcher(dispatcher: String) extends Attribute + private[OperationAttributes] final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes = apply(List(attribute)) @@ -82,4 +85,10 @@ object OperationAttributes { * Specifies the name of the dispatcher. */ def dispatcher(dispatcher: String): OperationAttributes = OperationAttributes(Dispatcher(dispatcher)) + + /** + * Decides how exceptions from user are to be handled. + */ + def supervisionStrategy(decider: Supervision.Decider): OperationAttributes = + OperationAttributes(SupervisionStrategy(decider)) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index ec83b84074..bd21d903b2 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -3,6 +3,8 @@ */ package akka.stream.stage +import akka.stream.Supervision + /** * General interface for stream transformation. * @@ -85,6 +87,25 @@ private[stream] abstract class AbstractStage[-In, Out, PushD <: Directive, PullD */ def onUpstreamFailure(cause: Throwable, ctx: Ctx): TerminationDirective = ctx.fail(cause) + /** + * If an exception is thrown from [[#onPush]] this method is invoked to decide how + * to handle the exception. By default this method returns [[Supervision.Stop]]. + * + * If an exception is thrown from [[#onPull]] the stream will always be completed with + * failure, because it is not always possible to recover from that state. + * In concrete stages it is of course possible to use ordinary try-catch-recover inside + * `onPull` when it is know how to recover from such exceptions. + * + */ + def decide(t: Throwable): Supervision.Directive = Supervision.Stop + + /** + * Used to create a fresh instance of the stage after an error resulting in a [[Supervision.Restart]] + * directive. By default it will return the same instance untouched, so you must override it + * if there are any state that should be cleared before restarting, e.g. by returning a new instance. + */ + def restart(): Stage[In, Out] = this + } /** @@ -164,7 +185,20 @@ abstract class PushStage[In, Out] extends PushPullStage[In, Out] { * * @see [[PushPullStage]] */ -abstract class DetachedStage[In, Out] extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]] +abstract class DetachedStage[In, Out] extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]] { + + /** + * If an exception is thrown from [[#onPush]] this method is invoked to decide how + * to handle the exception. By default this method returns [[Supervision.Stop]]. + * + * If an exception is thrown from [[#onPull]] or if the stage is holding state the stream + * will always be completed with failure, because it is not always possible to recover from + * that state. + * In concrete stages it is of course possible to use ordinary try-catch-recover inside + * `onPull` when it is know how to recover from such exceptions. + */ + override def decide(t: Throwable): Supervision.Directive = super.decide(t) +} /** * The behavior of [[StatefulStage]] is defined by these two methods, which