From 2959159e78e1a6f50999ec838ffb6f55d92ebeb1 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Mon, 23 Feb 2015 11:54:02 +0100 Subject: [PATCH] small fixes to stream supervision --- akka-docs-dev/rst/java/stream-error.rst | 7 +++- akka-docs-dev/rst/scala/stream-error.rst | 7 +++- .../akka/stream/ActorFlowMaterializer.scala | 11 ++++- .../main/scala/akka/stream/Supervision.scala | 42 ++++++++----------- 4 files changed, 37 insertions(+), 30 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-error.rst b/akka-docs-dev/rst/java/stream-error.rst index 68e86a0e14..d93599a2a1 100644 --- a/akka-docs-dev/rst/java/stream-error.rst +++ b/akka-docs-dev/rst/java/stream-error.rst @@ -6,7 +6,7 @@ Error Handling Strategies for how to handle exceptions from processing stream elements can be defined when materializing the stream. The error handling strategies are inspired by actor supervision -strategies, but the semantics has been adapted to the domain of stream processing. +strategies, but the semantics have been adapted to the domain of stream processing. Supervision Strategies ====================== @@ -32,7 +32,10 @@ The default supervision strategy for a stream can be defined on the settings of Here you can see that all ``ArithmeticException`` will resume the processing, i.e. the elements that cause the division by zero are effectively dropped. -Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-java`. +.. note:: + + Be aware that dropping elements may result in deadlocks in graphs with + cycles, as explained in :ref:`graph-cycles-java`. The supervision strategy can also be defined for a section of flow operators. diff --git a/akka-docs-dev/rst/scala/stream-error.rst b/akka-docs-dev/rst/scala/stream-error.rst index 6e77303173..38cbcc6424 100644 --- a/akka-docs-dev/rst/scala/stream-error.rst +++ b/akka-docs-dev/rst/scala/stream-error.rst @@ -6,7 +6,7 @@ Error Handling Strategies for how to handle exceptions from processing stream elements can be defined when materializing the stream. The error handling strategies are inspired by actor supervision -strategies, but the semantics has been adapted to the domain of stream processing. +strategies, but the semantics have been adapted to the domain of stream processing. Supervision Strategies ====================== @@ -32,7 +32,10 @@ The default supervision strategy for a stream can be defined on the settings of Here you can see that all ``ArithmeticException`` will resume the processing, i.e. the elements that cause the division by zero are effectively dropped. -Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-scala`. +.. note:: + + Be aware that dropping elements may result in deadlocks in graphs with + cycles, as explained in :ref:`graph-cycles-scala`. The supervision strategy can also be defined for a section of flow operators. diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 7277bb29db..0c3fdb3ef1 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -252,8 +252,15 @@ final case class ActorFlowMaterializerSettings( * overridden for specific sections of the stream operations with * [[akka.stream.javadsl.OperationAttributes#supervisionStrategy]]. */ - def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = - copy(supervisionDecider = e ⇒ decider.apply(e)) + def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { + import Supervision._ + copy(supervisionDecider = decider match { + case `resumingDecider` => resumingDecider + case `restartingDecider` => restartingDecider + case `stoppingDecider` => stoppingDecider + case other => other.apply _ + }) + } def withDebugLogging(enable: Boolean): ActorFlowMaterializerSettings = copy(debugLogging = enable) diff --git a/akka-stream/src/main/scala/akka/stream/Supervision.scala b/akka-stream/src/main/scala/akka/stream/Supervision.scala index 2957f8fe44..e2e45bf372 100644 --- a/akka-stream/src/main/scala/akka/stream/Supervision.scala +++ b/akka-stream/src/main/scala/akka/stream/Supervision.scala @@ -10,7 +10,7 @@ object Supervision { sealed trait Directive /** - * The stream will be completed with failure if application code for processing an element + * Scala API: The stream will be completed with failure if application code for processing an element * throws an exception. */ case object Stop extends Directive @@ -22,7 +22,7 @@ object Supervision { def stop = Stop /** - * The element is dropped and the stream continues if application code for processing + * Scala API: The element is dropped and the stream continues if application code for processing * an element throws an exception. */ case object Resume extends Directive @@ -34,7 +34,7 @@ object Supervision { def resume = Resume /** - * The element is dropped and the stream continues after restarting the stage + * Scala API: The element is dropped and the stream continues after restarting the stage * if application code for processing an element throws an exception. * Restarting a stage means that any accumulated state is cleared. This is typically * performed by creating a new instance of the stage. @@ -54,46 +54,40 @@ object Supervision { /** * Scala API: [[Decider]] that returns [[Stop]] for all exceptions. */ - val stoppingDecider: Decider = { - case NonFatal(_) ⇒ Stop - } + val stoppingDecider: Decider with japi.Function[Throwable, Directive] = + new Decider with japi.Function[Throwable, Directive] { + override def apply(e: Throwable) = Stop + } /** * Java API: Decider function that returns [[#stop]] for all exceptions. */ - val getStoppingDecider: japi.Function[Throwable, Directive] = - new japi.Function[Throwable, Directive] { - override def apply(e: Throwable): Directive = stoppingDecider(e) - } + val getStoppingDecider: japi.Function[Throwable, Directive] = stoppingDecider /** * Scala API: [[Decider]] that returns [[Resume]] for all exceptions. */ - val resumingDecider: Decider = { - case NonFatal(_) ⇒ Resume - } + val resumingDecider: Decider with japi.Function[Throwable, Directive] = + new Decider with japi.Function[Throwable, Directive] { + override def apply(e: Throwable) = Resume + } /** * Java API: Decider function that returns [[#resume]] for all exceptions. */ - val getResumingDecider: japi.Function[Throwable, Directive] = - new japi.Function[Throwable, Directive] { - override def apply(e: Throwable): Directive = resumingDecider(e) - } + val getResumingDecider: japi.Function[Throwable, Directive] = resumingDecider /** * Scala API: [[Decider]] that returns [[Restart]] for all exceptions. */ - val restartingDecider: Decider = { - case NonFatal(_) ⇒ Restart - } + val restartingDecider: Decider with japi.Function[Throwable, Directive] = + new Decider with japi.Function[Throwable, Directive] { + override def apply(e: Throwable) = Restart + } /** * Java API: Decider function that returns [[#restart]] for all exceptions. */ - val getRestartingDecider: japi.Function[Throwable, Directive] = - new japi.Function[Throwable, Directive] { - override def apply(e: Throwable): Directive = restartingDecider(e) - } + val getRestartingDecider: japi.Function[Throwable, Directive] = restartingDecider }