+str #15750 Prototype of stream supervision

* add supervion for stages and built in ops run by interpreter
* add supervision for mapAsync and mapAsyncUnordered
* add supervision to groupBy and splitWhen
* reference doc for scala and java
This commit is contained in:
Patrik Nordwall 2015-02-04 09:26:32 +01:00
parent 3b46966240
commit 693dcbefcc
33 changed files with 1563 additions and 179 deletions

View file

@ -0,0 +1,71 @@
.. _stream-error-java:
##############
Error Handling
##############
Strategies for how to handle exceptions from processing stream elements can be defined when
materializing the stream. The error handling strategies are inspired by actor supervision
strategies, but the semantics has been adapted to the domain of stream processing.
Supervision Strategies
======================
There are three ways to handle exceptions from application code:
* ``Stop`` - The stream is completed with failure.
* ``Resume`` - The element is dropped and the stream continues.
* ``Restart`` - The element is dropped and the stream continues after restarting the stage.
Restarting a stage means that any accumulated state is cleared. This is typically
performed by creating a new instance of the stage.
By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with
failure when an exception is thrown.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#stop
The default supervision strategy for a stream can be defined on the settings of the materializer.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#resume
Here you can see that all ``ArithmeticException`` will resume the processing, i.e. the
elements that cause the division by zero are effectively dropped.
Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-java`.
The supervision strategy can also be defined for a section of flow operators.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#resume-section
``Restart`` works in a similar way as ``Resume`` with the addition that accumulated state,
if any, of the failing processing stage will be reset.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#restart-section
Errors from mapAsync
====================
Stream supervision can also be applied to the futures of ``mapAsync``.
Let's say that we use an external service to lookup email addresses and we would like to
discard those that cannot be found.
We start with the tweet stream of authors:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/IntegrationDocTest.java#tweet-authors
Assume that we can lookup their email address using:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/IntegrationDocTest.java#email-address-lookup2
The ``Future`` is completed with ``Failure`` if the email is not found.
Transforming the stream of authors to a stream of email addresses by using the ``lookupEmail``
service can be done with ``mapAsync`` and we use ``Supervision.getResumingDecider`` to drop
unknown email addresses:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/IntegrationDocTest.java#email-addresses-mapAsync-supervision
If we would not use ``Resume`` the default stopping strategy would complete the stream
with failure on the first ``Future`` that was completed with ``Failure``.

View file

@ -14,6 +14,7 @@ Streams
stream-rate
stream-customize
stream-integrations
stream-error
stream-io
stream-cookbook
../stream-configuration

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import scala.concurrent.Await
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.Supervision
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
class FlowErrorDocSpec extends AkkaSpec {
"demonstrate fail stream" in {
//#stop
implicit val mat = ActorFlowMaterializer()
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)
//#stop
intercept[ArithmeticException] {
Await.result(result, remaining)
}
}
"demonstrate resume stream" in {
//#resume
val decider: Supervision.Decider = exc => exc match {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val mat = ActorFlowMaterializer(
ActorFlowMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
//#resume
Await.result(result, remaining) should be(228)
}
"demonstrate resume section" in {
//#resume-section
implicit val mat = ActorFlowMaterializer()
val decider: Supervision.Decider = exc => exc match {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val source = Source(0 to 5).section(OperationAttributes.supervisionStrategy(decider)) {
_.filter(100 / _ < 50).map(elem => 100 / (5 - elem))
}
val result = source.runWith(Sink.fold(0)(_ + _))
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)
//#resume-section
Await.result(result, remaining) should be(150)
}
"demonstrate restart section" in {
//#restart-section
implicit val mat = ActorFlowMaterializer()
val decider: Supervision.Decider = exc => exc match {
case _: IllegalArgumentException => Supervision.Restart
case _ => Supervision.Stop
}
val source = Source(List(1, 3, -1, 5, 7)).section(
OperationAttributes.supervisionStrategy(decider)) {
_.scan(0) { (acc, elem) =>
if (elem < 0) throw new IllegalArgumentException("negative not allowed")
else acc + elem
}
}
val result = source.grouped(1000).runWith(Sink.head)
// the negative element cause the scan stage to be restarted,
// i.e. start from 0 again
// result here will be a Future completed with Success(Vector(0, 1, 0, 5, 12))
//#restart-section
Await.result(result, remaining) should be(Vector(0, 1, 0, 5, 12))
}
}

View file

@ -22,6 +22,7 @@ import akka.stream.scaladsl.OperationAttributes
import scala.concurrent.ExecutionContext
import akka.stream.ActorFlowMaterializerSettings
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.Supervision
object IntegrationDocSpec {
import TwitterStreamQuickstartDocSpec._
@ -52,6 +53,13 @@ object IntegrationDocSpec {
Future.successful(Some(handle.hashCode.toString))
}
class AddressSystem2 {
//#email-address-lookup2
def lookupEmail(handle: String): Future[String] =
//#email-address-lookup2
Future.successful(handle + "@somewhere.com")
}
final case class Email(to: String, title: String, body: String)
final case class TextMessage(to: String, body: String)
@ -159,6 +167,22 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
probe.expectMsg("akkateam@somewhere.com")
}
"lookup email with mapAsync and supervision" in {
val addressSystem = new AddressSystem2
val authors: Source[Author] =
tweets.filter(_.hashtags.contains(akka)).map(_.author)
//#email-addresses-mapAsync-supervision
import OperationAttributes.supervisionStrategy
import Supervision.resumingDecider
val emailAddresses: Source[String] =
authors.section(supervisionStrategy(resumingDecider)) {
_.mapAsync(author => addressSystem.lookupEmail(author.handle))
}
//#email-addresses-mapAsync-supervision
}
"calling external service with mapAsyncUnordered" in {
val probe = TestProbe()
val addressSystem = new AddressSystem

View file

@ -0,0 +1,71 @@
.. _stream-error-scala:
##############
Error Handling
##############
Strategies for how to handle exceptions from processing stream elements can be defined when
materializing the stream. The error handling strategies are inspired by actor supervision
strategies, but the semantics has been adapted to the domain of stream processing.
Supervision Strategies
======================
There are three ways to handle exceptions from application code:
* ``Stop`` - The stream is completed with failure.
* ``Resume`` - The element is dropped and the stream continues.
* ``Restart`` - The element is dropped and the stream continues after restarting the stage.
Restarting a stage means that any accumulated state is cleared. This is typically
performed by creating a new instance of the stage.
By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with
failure when an exception is thrown.
.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#stop
The default supervision strategy for a stream can be defined on the settings of the materializer.
.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#resume
Here you can see that all ``ArithmeticException`` will resume the processing, i.e. the
elements that cause the division by zero are effectively dropped.
Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-scala`.
The supervision strategy can also be defined for a section of flow operators.
.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#resume-section
``Restart`` works in a similar way as ``Resume`` with the addition that accumulated state,
if any, of the failing processing stage will be reset.
.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#restart-section
Errors from mapAsync
====================
Stream supervision can also be applied to the futures of ``mapAsync``.
Let's say that we use an external service to lookup email addresses and we would like to
discard those that cannot be found.
We start with the tweet stream of authors:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#tweet-authors
Assume that we can lookup their email address using:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-address-lookup2
The ``Future`` is completed with ``Failure`` if the email is not found.
Transforming the stream of authors to a stream of email addresses by using the ``lookupEmail``
service can be done with ``mapAsync`` and we use ``Supervision.resumingDecider`` to drop
unknown email addresses:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-addresses-mapAsync-supervision
If we would not use ``Resume`` the default stopping strategy would complete the stream
with failure on the first ``Future`` that was completed with ``Failure``.

View file

@ -14,6 +14,7 @@ Streams
stream-rate
stream-customize
stream-integrations
stream-error
stream-io
stream-cookbook
../stream-configuration

View file

@ -10,8 +10,8 @@ import akka.stream.scaladsl.OperationAttributes._
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import org.reactivestreams.{ Publisher, Processor }
import akka.stream.impl.fusing.Map
import scala.concurrent.Promise
import akka.stream.Supervision
class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
@ -26,7 +26,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet()
val (processor, _ns) = materializer.asInstanceOf[ActorFlowMaterializerImpl].processorForNode(
Ast.Fused(List(Map[Int, Int](identity)), name("identity")), flowName, 1)
Ast.Fused(List(Map[Int, Int](identity, Supervision.stoppingDecider)), name("identity")), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}

View file

@ -10,6 +10,7 @@ import akka.testkit.TestProbe
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
import scala.util.control.NoStackTrace
object StreamTestKit {
@ -176,4 +177,6 @@ object StreamTestKit {
def getPublisher: Publisher[I] = this
}
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
}

View file

@ -3,15 +3,17 @@
*/
package akka.stream.impl.fusing
import akka.stream.impl.fusing.Map
import scala.util.control.NoStackTrace
import akka.stream.Supervision
class InterpreterSpec extends InterpreterSpecKit {
import Supervision.stoppingDecider
import Supervision.resumingDecider
import Supervision.restartingDecider
"Interpreter" must {
"implement map correctly" in new TestSetup(Seq(Map((x: Int) x + 1))) {
"implement map correctly" in new TestSetup(Seq(Map((x: Int) x + 1, stoppingDecider))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
@ -31,9 +33,9 @@ class InterpreterSpec extends InterpreterSpecKit {
}
"implement chain of maps correctly" in new TestSetup(Seq(
Map((x: Int) x + 1),
Map((x: Int) x * 2),
Map((x: Int) x + 1))) {
Map((x: Int) x + 1, stoppingDecider),
Map((x: Int) x * 2, stoppingDecider),
Map((x: Int) x + 1, stoppingDecider))) {
lastEvents() should be(Set.empty)
@ -68,7 +70,7 @@ class InterpreterSpec extends InterpreterSpecKit {
"implement one-to-many many-to-one chain correctly" in new TestSetup(Seq(
Doubler(),
Filter((x: Int) x != 0))) {
Filter((x: Int) x != 0, stoppingDecider))) {
lastEvents() should be(Set.empty)
@ -92,7 +94,7 @@ class InterpreterSpec extends InterpreterSpecKit {
}
"implement many-to-one one-to-many chain correctly" in new TestSetup(Seq(
Filter((x: Int) x != 0),
Filter((x: Int) x != 0, stoppingDecider),
Doubler())) {
lastEvents() should be(Set.empty)
@ -134,9 +136,9 @@ class InterpreterSpec extends InterpreterSpecKit {
}
"implement take inside a chain" in new TestSetup(Seq(
Filter((x: Int) x != 0),
Filter((x: Int) x != 0, stoppingDecider),
Take(2),
Map((x: Int) x + 1))) {
Map((x: Int) x + 1, stoppingDecider))) {
lastEvents() should be(Set.empty)
@ -156,7 +158,7 @@ class InterpreterSpec extends InterpreterSpecKit {
lastEvents() should be(Set(Cancel, OnComplete, OnNext(3)))
}
"implement fold" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x))) {
"implement fold" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x, stoppingDecider))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
@ -175,7 +177,7 @@ class InterpreterSpec extends InterpreterSpecKit {
lastEvents() should be(Set(OnNext(3), OnComplete))
}
"implement fold with proper cancel" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x))) {
"implement fold with proper cancel" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x, stoppingDecider))) {
lastEvents() should be(Set.empty)
@ -195,7 +197,7 @@ class InterpreterSpec extends InterpreterSpecKit {
lastEvents() should be(Set(Cancel))
}
"work if fold completes while not in a push position" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x))) {
"work if fold completes while not in a push position" in new TestSetup(Seq(Fold(0, (agg: Int, x: Int) agg + x, stoppingDecider))) {
lastEvents() should be(Set.empty)
@ -233,7 +235,8 @@ class InterpreterSpec extends InterpreterSpecKit {
"implement conflate" in new TestSetup(Seq(Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x))) {
(agg: Int, x: Int) agg + x,
stoppingDecider))) {
lastEvents() should be(Set(RequestOne))
@ -293,10 +296,12 @@ class InterpreterSpec extends InterpreterSpecKit {
"work with conflate-conflate" in new TestSetup(Seq(
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x),
(agg: Int, x: Int) agg + x,
stoppingDecider),
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x))) {
(agg: Int, x: Int) agg + x,
stoppingDecider))) {
lastEvents() should be(Set(RequestOne))
@ -366,7 +371,8 @@ class InterpreterSpec extends InterpreterSpecKit {
"implement conflate-expand" in new TestSetup(Seq(
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x),
(agg: Int, x: Int) agg + x,
stoppingDecider),
Expand(
(in: Int) in,
(agg: Int) (agg, agg)))) {
@ -407,7 +413,8 @@ class InterpreterSpec extends InterpreterSpecKit {
Doubler(),
Conflate(
(in: Int) in,
(agg: Int, x: Int) agg + x))) {
(agg: Int, x: Int) agg + x,
stoppingDecider))) {
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
@ -422,11 +429,11 @@ class InterpreterSpec extends InterpreterSpecKit {
}
"work with jumpback table and completed elements" in new TestSetup(Seq(
Map((x: Int) x),
Map((x: Int) x),
Map((x: Int) x, stoppingDecider),
Map((x: Int) x, stoppingDecider),
KeepGoing(),
Map((x: Int) x),
Map((x: Int) x))) {
Map((x: Int) x, stoppingDecider),
Map((x: Int) x, stoppingDecider))) {
lastEvents() should be(Set.empty)
@ -469,56 +476,6 @@ class InterpreterSpec extends InterpreterSpecKit {
"implement drop-take" in pending
val TE = new Exception("TEST") with NoStackTrace {
override def toString = "TE"
}
"handle external failure" in new TestSetup(Seq(Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
upstream.onError(TE)
lastEvents() should be(Set(OnError(TE)))
}
"handle failure inside op" in new TestSetup(Seq(Map((x: Int) if (x == 0) throw TE else x))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0)
lastEvents() should be(Set(Cancel, OnError(TE)))
}
"handle failure inside op in middle of the chain" in new TestSetup(Seq(
Map((x: Int) x + 1),
Map((x: Int) if (x == 0) throw TE else x),
Map((x: Int) x + 1))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(4)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1)
lastEvents() should be(Set(Cancel, OnError(TE)))
}
"work with keep-going ops" in pending
}

View file

@ -13,6 +13,7 @@ trait InterpreterSpecKit extends AkkaSpec {
case class OnError(cause: Throwable)
case class OnNext(elem: Any)
case object RequestOne
case object RequestAnother
private[akka] case class Doubler[T]() extends PushPullStage[T, T] {
var oneMore: Boolean = false
@ -71,6 +72,9 @@ trait InterpreterSpecKit extends AkkaSpec {
}
override def onPull(ctx: BoundaryContext): Directive = {
if (lastEvent(RequestOne))
lastEvent += RequestAnother
else
lastEvent += RequestOne
ctx.exit()
}

View file

@ -3,7 +3,10 @@
*/
package akka.stream.impl.fusing
import akka.stream.Supervision
class InterpreterStressSpec extends InterpreterSpecKit {
import Supervision.stoppingDecider
val chainLength = 1000 * 1000
val halfLength = chainLength / 2
@ -11,7 +14,7 @@ class InterpreterStressSpec extends InterpreterSpecKit {
"Interpreter" must {
"work with a massive chain of maps" in new TestSetup(Seq.fill(chainLength)(Map((x: Int) x + 1))) {
"work with a massive chain of maps" in new TestSetup(Seq.fill(chainLength)(Map((x: Int) x + 1, stoppingDecider))) {
lastEvents() should be(Set.empty)
val tstamp = System.nanoTime()
@ -33,9 +36,9 @@ class InterpreterStressSpec extends InterpreterSpecKit {
info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s")
}
"work with a massive chain of maps with early complete" in new TestSetup(Seq.fill(halfLength)(Map((x: Int) x + 1)) ++
"work with a massive chain of maps with early complete" in new TestSetup(Seq.fill(halfLength)(Map((x: Int) x + 1, stoppingDecider)) ++
Seq(Take(repetition / 2)) ++
Seq.fill(halfLength)(Map((x: Int) x + 1))) {
Seq.fill(halfLength)(Map((x: Int) x + 1, stoppingDecider))) {
lastEvents() should be(Set.empty)
val tstamp = System.nanoTime()
@ -92,7 +95,8 @@ class InterpreterStressSpec extends InterpreterSpecKit {
"work with a massive chain of conflates by overflowing to the heap" in new TestSetup(Seq.fill(100000)(Conflate(
(in: Int) in,
(agg: Int, in: Int) agg + in)),
(agg: Int, in: Int) agg + in,
Supervision.stoppingDecider)),
forkLimit = 100,
overflowToHeap = true) {

View file

@ -0,0 +1,523 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.fusing
import scala.util.control.NoStackTrace
import akka.stream.Supervision
import akka.stream.stage.Context
import akka.stream.stage.Directive
import akka.stream.stage.PushPullStage
import akka.stream.stage.Stage
import akka.stream.stage.TerminationDirective
object InterpreterSupervisionSpec {
val TE = new Exception("TEST") with NoStackTrace {
override def toString = "TE"
}
class RestartTestStage extends PushPullStage[Int, Int] {
var sum = 0
def onPush(elem: Int, ctx: Context[Int]): Directive = {
sum += elem
ctx.push(sum)
}
override def onPull(ctx: Context[Int]): Directive = {
ctx.pull()
}
override def decide(t: Throwable): Supervision.Directive = Supervision.Restart
override def restart(): Stage[Int, Int] = {
sum = 0
this
}
}
case class OneToManyTestStage(decider: Supervision.Decider, absorbTermination: Boolean = false) extends PushPullStage[Int, Int] {
var buf: List[Int] = Nil
def onPush(elem: Int, ctx: Context[Int]): Directive = {
buf = List(elem + 1, elem + 2, elem + 3)
ctx.push(elem)
}
override def onPull(ctx: Context[Int]): Directive = {
if (buf.isEmpty && ctx.isFinishing)
ctx.finish()
else if (buf.isEmpty)
ctx.pull()
else {
val elem = buf.head
buf = buf.tail
if (elem == 3) throw TE
ctx.push(elem)
}
}
override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective =
if (absorbTermination)
ctx.absorbTermination()
else
ctx.finish()
// note that resume will be turned into failure in the Interpreter if exception is thrown from onPull
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def restart(): OneToManyTestStage = copy()
}
}
class InterpreterSupervisionSpec extends InterpreterSpecKit {
import InterpreterSupervisionSpec._
import Supervision.stoppingDecider
import Supervision.resumingDecider
import Supervision.restartingDecider
"Interpreter error handling" must {
"handle external failure" in new TestSetup(Seq(Map((x: Int) x + 1, stoppingDecider))) {
lastEvents() should be(Set.empty)
upstream.onError(TE)
lastEvents() should be(Set(OnError(TE)))
}
"emit failure when op throws" in new TestSetup(Seq(Map((x: Int) if (x == 0) throw TE else x, stoppingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0) // boom
lastEvents() should be(Set(Cancel, OnError(TE)))
}
"emit failure when op throws in middle of the chain" in new TestSetup(Seq(
Map((x: Int) x + 1, stoppingDecider),
Map((x: Int) if (x == 0) throw TE else x + 10, stoppingDecider),
Map((x: Int) x + 100, stoppingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(113)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1) // boom
lastEvents() should be(Set(Cancel, OnError(TE)))
}
"resume when Map throws" in new TestSetup(Seq(Map((x: Int) if (x == 0) throw TE else x, resumingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0) // boom
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(OnNext(3)))
}
"resume when Map throws in middle of the chain" in new TestSetup(Seq(
Map((x: Int) x + 1, resumingDecider),
Map((x: Int) if (x == 0) throw TE else x + 10, resumingDecider),
Map((x: Int) x + 100, resumingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(113)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1) // boom
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(OnNext(114)))
}
"resume when Map throws before Grouped" in new TestSetup(Seq(
Map((x: Int) x + 1, resumingDecider),
Map((x: Int) if (x <= 0) throw TE else x + 10, resumingDecider),
Grouped(3))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(RequestOne))
upstream.onNext(4)
lastEvents() should be(Set(OnNext(Vector(13, 14, 15))))
}
"complete after resume when Map throws before Grouped" in new TestSetup(Seq(
Map((x: Int) x + 1, resumingDecider),
Map((x: Int) if (x <= 0) throw TE else x + 10, resumingDecider),
Grouped(1000))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(RequestOne))
upstream.onComplete()
lastEvents() should be(Set(OnNext(Vector(13, 14)), OnComplete))
}
"restart when onPush throws" in {
val stage = new RestartTestStage {
override def onPush(elem: Int, ctx: Context[Int]): Directive = {
if (elem <= 0) throw TE
else super.onPush(elem, ctx)
}
}
new TestSetup(Seq(
Map((x: Int) x + 1, restartingDecider),
stage,
Map((x: Int) x + 100, restartingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(103)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1) // boom
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(OnNext(104)))
}
}
"restart when onPush throws after ctx.push" in {
val stage = new RestartTestStage {
override def onPush(elem: Int, ctx: Context[Int]): Directive = {
val ret = ctx.push(sum)
super.onPush(elem, ctx)
if (elem <= 0) throw TE
ret
}
}
new TestSetup(Seq(
Map((x: Int) x + 1, restartingDecider),
stage,
Map((x: Int) x + 100, restartingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(103)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(-1) // boom
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(OnNext(104)))
}
}
"fail when onPull throws" in {
val stage = new RestartTestStage {
override def onPull(ctx: Context[Int]): Directive = {
if (sum < 0) throw TE
super.onPull(ctx)
}
}
new TestSetup(Seq(
Map((x: Int) x + 1, restartingDecider),
stage,
Map((x: Int) x + 100, restartingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(103)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(-5) // this will trigger failure of next requestOne (pull)
lastEvents() should be(Set(OnNext(99)))
downstream.requestOne() // boom
lastEvents() should be(Set(OnError(TE), Cancel))
}
}
"resume when Filter throws" in new TestSetup(Seq(
Filter((x: Int) if (x == 0) throw TE else true, resumingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(OnNext(3)))
}
"resume when MapConcat throws" in new TestSetup(Seq(
MapConcat((x: Int) if (x == 0) throw TE else List(x, -x), resumingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(-1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0) // boom
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(-2)))
}
"restart when Collect throws" in {
// TODO can't get type inference to work with `pf` inlined
val pf: PartialFunction[Int, Int] =
{ case x: Int if (x == 0) throw TE else x }
new TestSetup(Seq(
Collect(restartingDecider)(pf))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(3)
lastEvents() should be(Set(OnNext(3)))
}
}
"resume when Scan throws" in new TestSetup(Seq(
Scan(1, (acc: Int, x: Int) if (x == 10) throw TE else acc + x, resumingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(10) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(4)
lastEvents() should be(Set(OnNext(3))) // 1 + 2
}
"restart when Scan throws" in new TestSetup(Seq(
Scan(1, (acc: Int, x: Int) if (x == 10) throw TE else acc + x, restartingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(10) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(4)
lastEvents() should be(Set(OnNext(1))) // starts over again
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(20)
lastEvents() should be(Set(OnNext(5))) // 1+4
}
"restart when Conflate `seed` throws" in new TestSetup(Seq(Conflate(
(in: Int) if (in == 1) throw TE else in,
(agg: Int, x: Int) agg + x,
restartingDecider))) {
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set.empty)
upstream.onNext(0)
lastEvents() should be(Set(OnNext(0), RequestOne))
upstream.onNext(1) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(RequestOne))
upstream.onNext(10)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(12))) // note that 1 has been discarded
downstream.requestOne()
lastEvents() should be(Set.empty)
}
"restart when Conflate `aggregate` throws" in new TestSetup(Seq(Conflate(
(in: Int) in,
(agg: Int, x: Int) if (x == 2) throw TE else agg + x,
restartingDecider))) {
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set.empty)
upstream.onNext(0)
lastEvents() should be(Set(OnNext(0), RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(RequestOne))
upstream.onNext(2) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(10)
lastEvents() should be(Set(RequestOne))
downstream.requestOne()
lastEvents() should be(Set(OnNext(10))) // note that 1 and 2 has been discarded
downstream.requestOne()
lastEvents() should be(Set.empty)
upstream.onNext(4)
lastEvents() should be(Set(OnNext(4), RequestOne))
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
"fail when Expand `seed` throws" in new TestSetup(Seq(Expand(
(in: Int) if (in == 2) throw TE else in,
(agg: Int) (agg, -math.abs(agg))))) {
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne, OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(-1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(-1)))
upstream.onNext(2) // boom
lastEvents() should be(Set(OnError(TE), Cancel))
}
"fail when Expand `extrapolate` throws" in new TestSetup(Seq(Expand(
(in: Int) in,
(agg: Int) if (agg == 2) throw TE else (agg, -math.abs(agg))))) {
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne, OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(-1)))
upstream.onNext(2) // boom
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(OnError(TE), Cancel))
}
"fail when onPull throws before pushing all generated elements" in {
def test(decider: Supervision.Decider, absorbTermination: Boolean): Unit = {
new TestSetup(Seq(
OneToManyTestStage(decider, absorbTermination))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1)))
if (absorbTermination) {
upstream.onComplete()
lastEvents() should be(Set.empty)
}
downstream.requestOne()
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
// 3 => boom
if (absorbTermination)
lastEvents() should be(Set(OnError(TE)))
else
lastEvents() should be(Set(OnError(TE), Cancel))
}
}
test(resumingDecider, absorbTermination = false)
test(restartingDecider, absorbTermination = false)
test(resumingDecider, absorbTermination = true)
test(restartingDecider, absorbTermination = true)
}
}
}

View file

@ -7,21 +7,23 @@ import scala.collection.immutable
import akka.stream.testkit.AkkaSpec
import akka.util.ByteString
import akka.stream.stage._
import akka.stream.Supervision
class IteratorInterpreterSpec extends AkkaSpec {
import Supervision.stoppingDecider
"IteratorInterpreter" must {
"work in the happy case" in {
val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq(
Map((x: Int) x + 1))).iterator
Map((x: Int) x + 1, stoppingDecider))).iterator
itr.toSeq should be(2 to 11)
}
"hasNext should not affect elements" in {
val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq(
Map((x: Int) x))).iterator
Map((x: Int) x, stoppingDecider))).iterator
itr.hasNext should be(true)
itr.hasNext should be(true)
@ -40,7 +42,7 @@ class IteratorInterpreterSpec extends AkkaSpec {
"throw exceptions on empty iterator" in {
val itr = new IteratorInterpreter[Int, Int](List(1).iterator, Seq(
Map((x: Int) x))).iterator
Map((x: Int) x, stoppingDecider))).iterator
itr.next() should be(1)
a[NoSuchElementException] should be thrownBy { itr.next() }
@ -78,7 +80,7 @@ class IteratorInterpreterSpec extends AkkaSpec {
"work with an empty iterator" in {
val itr = new IteratorInterpreter[Int, Int](Iterator.empty, Seq(
Map((x: Int) x + 1))).iterator
Map((x: Int) x + 1, stoppingDecider))).iterator
itr.hasNext should be(false)
a[NoSuchElementException] should be thrownBy { itr.next() }

View file

@ -4,11 +4,12 @@
package akka.stream.scaladsl
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.Supervision.resumingDecider
import akka.stream.scaladsl.OperationAttributes.supervisionStrategy
import akka.stream.testkit._
import akka.stream.testkit.StreamTestKit.TE
import org.reactivestreams.Publisher
class FlowGroupBySpec extends AkkaSpec {
@ -52,8 +53,6 @@ class FlowGroupBySpec extends AkkaSpec {
}
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
"groupBy" must {
"work in the happy case" in new SubstreamsSupport(groupCount = 2) {
val s1 = StreamPuppet(getSubFlow(1).runWith(Sink.publisher))
@ -88,7 +87,6 @@ class FlowGroupBySpec extends AkkaSpec {
s1.expectComplete()
masterSubscriber.expectComplete()
}
"accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) {
@ -189,6 +187,77 @@ class FlowGroupBySpec extends AkkaSpec {
subscriber.expectError(e)
}
"fail stream when groupBy function throws" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.groupBy(elem if (elem == 2) throw exc else elem % 2)
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
val (_, substream) = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
substreamPuppet.request(1)
substreamPuppet.expectNext(1)
upstreamSubscription.sendNext(2)
subscriber.expectError(exc)
substreamPuppet.expectError(exc)
upstreamSubscription.expectCancellation()
}
"resume stream when groupBy function throws" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe).section(supervisionStrategy(resumingDecider))(
_.groupBy(elem if (elem == 2) throw exc else elem % 2))
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
val (_, substream1) = subscriber.expectNext()
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher))
substreamPuppet1.request(10)
substreamPuppet1.expectNext(1)
upstreamSubscription.sendNext(2)
upstreamSubscription.sendNext(4)
val (_, substream2) = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(4) // note that 2 was dropped
upstreamSubscription.sendNext(3)
substreamPuppet1.expectNext(3)
upstreamSubscription.sendNext(6)
substreamPuppet2.expectNext(6)
upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet1.expectComplete()
substreamPuppet2.expectComplete()
}
}
}

View file

@ -8,12 +8,13 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NoStackTrace
import akka.stream.ActorFlowMaterializer
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.TestLatch
import akka.testkit.TestProbe
import akka.stream.scaladsl.OperationAttributes.supervisionStrategy
import akka.stream.Supervision.resumingDecider
class FlowMapAsyncSpec extends AkkaSpec {
@ -109,5 +110,31 @@ class FlowMapAsyncSpec extends AkkaSpec {
latch.countDown()
}
"resume after future failure" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(n Future {
if (n == 3) throw new RuntimeException("err3") with NoStackTrace
else n
})).to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
for (n List(1, 2, 4, 5)) c.expectNext(n)
c.expectComplete()
}
"resume when mapAsync throws" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(n
if (n == 3) throw new RuntimeException("err4") with NoStackTrace
else Future(n))).
to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
for (n List(1, 2, 4, 5)) c.expectNext(n)
c.expectComplete()
}
}
}

View file

@ -7,12 +7,15 @@ import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.ActorFlowMaterializer
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.TestLatch
import akka.testkit.TestProbe
import akka.stream.scaladsl.OperationAttributes.supervisionStrategy
import akka.stream.Supervision.resumingDecider
import akka.stream.testkit.StreamTestKit.OnNext
import akka.stream.testkit.StreamTestKit.OnComplete
class FlowMapAsyncUnorderedSpec extends AkkaSpec {
@ -88,7 +91,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val latch = TestLatch(1)
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsync(n
val p = Source(1 to 5).mapAsyncUnordered(n
if (n == 3) throw new RuntimeException("err2") with NoStackTrace
else {
Future {
@ -103,5 +106,31 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
latch.countDown()
}
"resume after future failure" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(n Future {
if (n == 3) throw new RuntimeException("err3") with NoStackTrace
else n
})).to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet
c.probe.receiveN(5).toSet should be(expected)
}
"resume when mapAsyncUnordered throws" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(n
if (n == 3) throw new RuntimeException("err4") with NoStackTrace
else Future(n))).
to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet
c.probe.receiveN(5).toSet should be(expected)
}
}
}

View file

@ -59,16 +59,16 @@ object FlowSpec {
override def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): (Processor[In, Out], MaterializedMap) = {
val props = op match {
case f: Fused Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage))
case Map(f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage))
case Filter(p, _) Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage))
case Map(f, att) Props(new BrokenActorInterpreter(settings, List(fusing.Map(f, att.settings(settings).supervisionDecider)), brokenMessage))
case Filter(p, att) Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p, att.settings(settings).supervisionDecider)), brokenMessage))
case Drop(n, _) Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage))
case Take(n, _) Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage))
case Collect(pf, _) Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage))
case Scan(z, f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage))
case Collect(pf, att) Props(new BrokenActorInterpreter(settings, List(fusing.Collect(att.settings(settings).supervisionDecider)(pf)), brokenMessage))
case Scan(z, f, att) Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f, att.settings(settings).supervisionDecider)), brokenMessage))
case Expand(s, f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage))
case Conflate(s, f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage))
case Conflate(s, f, att) Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f, att.settings(settings).supervisionDecider)), brokenMessage))
case Buffer(n, s, _) Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage))
case MapConcat(f, _) Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage))
case MapConcat(f, att) Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f, att.settings(settings).supervisionDecider)), brokenMessage))
case o ActorProcessorFactory.props(this, o)
}
val impl = actorOf(props.withDispatcher(settings.dispatcher), s"$flowName-$n-${op.attributes.name}")

View file

@ -4,11 +4,13 @@
package akka.stream.scaladsl
import scala.concurrent.duration._
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.Supervision.resumingDecider
import akka.stream.scaladsl.OperationAttributes.supervisionStrategy
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.TE
import org.reactivestreams.Publisher
class FlowSplitWhenSpec extends AkkaSpec {
@ -27,6 +29,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete()
def expectError(e: Throwable) = probe.expectError(e)
def cancel(): Unit = subscription.cancel()
}
@ -105,6 +108,82 @@ class FlowSplitWhenSpec extends AkkaSpec {
s1.expectComplete()
}
"fail stream when splitWhen function throws" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe)
.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Source[Int]]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
val substream = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.runWith(Sink.publisher))
substreamPuppet.request(10)
substreamPuppet.expectNext(1)
upstreamSubscription.sendNext(2)
substreamPuppet.expectNext(2)
upstreamSubscription.sendNext(3)
subscriber.expectError(exc)
substreamPuppet.expectError(exc)
upstreamSubscription.expectCancellation()
}
"resume stream when splitWhen function throws" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val exc = TE("test")
val publisher = Source(publisherProbeProbe).section(supervisionStrategy(resumingDecider))(
_.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0))
.runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Source[Int]]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
val substream1 = subscriber.expectNext()
val substreamPuppet1 = StreamPuppet(substream1.runWith(Sink.publisher))
substreamPuppet1.request(10)
substreamPuppet1.expectNext(1)
upstreamSubscription.sendNext(2)
substreamPuppet1.expectNext(2)
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped
upstreamSubscription.sendNext(5)
substreamPuppet1.expectNext(5)
upstreamSubscription.sendNext(6)
substreamPuppet1.expectComplete()
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.publisher))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(6)
upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()
}
}
}

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import scala.collection.immutable
import scala.concurrent.duration._
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.testkit.AkkaSpec
import scala.util.control.NoStackTrace
import scala.concurrent.Await
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.Supervision
class FlowSupervisionSpec extends AkkaSpec {
import OperationAttributes.supervisionStrategy
implicit val materializer = ActorFlowMaterializer()(system)
val exc = new RuntimeException("simulated exc") with NoStackTrace
val failingMap = (s: Source[Int]) s.map(n if (n == 3) throw exc else n)
// FIXME this would be more elegant with Flow[Int, Int] and `via`, but `via` is currently not propagating the OperationAttributes
def run(s: Source[Int] Source[Int]): immutable.Seq[Int] =
Await.result(s(Source(1 to 5)).grouped(1000).runWith(Sink.head), 3.seconds)
"Stream supervision" must {
"stop and complete stream with failure by default" in {
intercept[RuntimeException] {
run(failingMap)
} should be(exc)
}
"support resume " in {
val result = run(s s.section(supervisionStrategy(Supervision.resumingDecider))(
failingMap(_)))
result should be(List(1, 2, 4, 5))
}
}
}

View file

@ -18,6 +18,7 @@ import org.reactivestreams.Subscriber
import scala.concurrent.duration._
import akka.actor.Props
import akka.actor.ActorRef
import akka.stream.javadsl.japi
object ActorFlowMaterializer {
@ -196,6 +197,7 @@ object ActorFlowMaterializerSettings {
initialInputBufferSize = config.getInt("initial-input-buffer-size"),
maxInputBufferSize = config.getInt("max-input-buffer-size"),
dispatcher = config.getString("dispatcher"),
supervisionDecider = Supervision.stoppingDecider,
subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config),
fileIODispatcher = config.getString("file-io-dispatcher"),
debugLogging = config.getBoolean("debug-logging"),
@ -230,6 +232,7 @@ final case class ActorFlowMaterializerSettings(
initialInputBufferSize: Int,
maxInputBufferSize: Int,
dispatcher: String,
supervisionDecider: Supervision.Decider,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
fileIODispatcher: String, // FIXME Why does this exist?!
debugLogging: Boolean,
@ -247,6 +250,22 @@ final case class ActorFlowMaterializerSettings(
def withDispatcher(dispatcher: String): ActorFlowMaterializerSettings =
copy(dispatcher = dispatcher)
/**
* Scala API: Decides how exceptions from application code are to be handled, unless
* overridden for specific sections of the stream operations with
* [[akka.stream.scaladsl.OperationAttributes#supervisionStrategy]].
*/
def withSupervisionStrategy(decider: Supervision.Decider): ActorFlowMaterializerSettings =
copy(supervisionDecider = decider)
/**
* Java API: Decides how exceptions from application code are to be handled, unless
* overridden for specific sections of the stream operations with
* [[akka.stream.javadsl.OperationAttributes#supervisionStrategy]].
*/
def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings =
copy(supervisionDecider = e decider.apply(e))
def withDebugLogging(enable: Boolean): ActorFlowMaterializerSettings =
copy(debugLogging = enable)

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.util.control.NonFatal
import akka.stream.javadsl.japi
object Supervision {
sealed trait Directive
/**
* The stream will be completed with failure if application code for processing an element
* throws an exception.
*/
case object Stop extends Directive
/**
* Java API: The stream will be completed with failure if application code for processing an element
* throws an exception.
*/
def stop = Stop
/**
* The element is dropped and the stream continues if application code for processing
* an element throws an exception.
*/
case object Resume extends Directive
/**
* Java API: The element is dropped and the stream continues if application code for processing
* an element throws an exception.
*/
def resume = Resume
/**
* The element is dropped and the stream continues after restarting the stage
* if application code for processing an element throws an exception.
* Restarting a stage means that any accumulated state is cleared. This is typically
* performed by creating a new instance of the stage.
*/
case object Restart extends Directive
/**
* Java API: The element is dropped and the stream continues after restarting the stage
* if application code for processing an element throws an exception.
* Restarting a stage means that any accumulated state is cleared. This is typically
* performed by creating a new instance of the stage.
*/
def restart = Restart
type Decider = Function[Throwable, Directive]
/**
* Scala API: [[Decider]] that returns [[Stop]] for all exceptions.
*/
val stoppingDecider: Decider = {
case NonFatal(_) Stop
}
/**
* Java API: Decider function that returns [[#stop]] for all exceptions.
*/
val getStoppingDecider: japi.Function[Throwable, Directive] =
new japi.Function[Throwable, Directive] {
override def apply(e: Throwable): Directive = stoppingDecider(e)
}
/**
* Scala API: [[Decider]] that returns [[Resume]] for all exceptions.
*/
val resumingDecider: Decider = {
case NonFatal(_) Resume
}
/**
* Java API: Decider function that returns [[#resume]] for all exceptions.
*/
val getResumingDecider: japi.Function[Throwable, Directive] =
new japi.Function[Throwable, Directive] {
override def apply(e: Throwable): Directive = resumingDecider(e)
}
/**
* Scala API: [[Decider]] that returns [[Restart]] for all exceptions.
*/
val restartingDecider: Decider = {
case NonFatal(_) Restart
}
/**
* Java API: Decider function that returns [[#restart]] for all exceptions.
*/
val getRestartingDecider: japi.Function[Throwable, Directive] =
new japi.Function[Throwable, Directive] {
override def apply(e: Throwable): Directive = restartingDecider(e)
}
}

View file

@ -317,16 +317,16 @@ case class ActorFlowMaterializerImpl(
// Optimizations below
case noMatch if !optimizations.fusion prev
case Ast.Map(f, _) fusing.Map(f) :: prev
case Ast.Filter(p, _) fusing.Filter(p) :: prev
case Ast.Map(f, att) fusing.Map(f, att.settings(settings).supervisionDecider) :: prev
case Ast.Filter(p, att) fusing.Filter(p, att.settings(settings).supervisionDecider) :: prev
case Ast.Drop(n, _) fusing.Drop(n) :: prev
case Ast.Take(n, _) fusing.Take(n) :: prev
case Ast.Collect(pf, _) fusing.Collect(pf) :: prev
case Ast.Scan(z, f, _) fusing.Scan(z, f) :: prev
case Ast.Collect(pf, att) fusing.Collect(att.settings(settings).supervisionDecider)(pf) :: prev
case Ast.Scan(z, f, att) fusing.Scan(z, f, att.settings(settings).supervisionDecider) :: prev
case Ast.Expand(s, f, _) fusing.Expand(s, f) :: prev
case Ast.Conflate(s, f, _) fusing.Conflate(s, f) :: prev
case Ast.Conflate(s, f, att) fusing.Conflate(s, f, att.settings(settings).supervisionDecider) :: prev
case Ast.Buffer(n, s, _) fusing.Buffer(n, s) :: prev
case Ast.MapConcat(f, _) fusing.MapConcat(f) :: prev
case Ast.MapConcat(f, att) fusing.MapConcat(f, att.settings(settings).supervisionDecider) :: prev
case Ast.Grouped(n, _) fusing.Grouped(n) :: prev
//FIXME Add more fusion goodies here
case _ prev
@ -556,22 +556,32 @@ private[akka] object ActorProcessorFactory {
val settings = materializer.settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
op match {
case Fused(ops, _) ActorInterpreter.props(settings, ops)
case Map(f, _) ActorInterpreter.props(settings, List(fusing.Map(f)))
case Filter(p, _) ActorInterpreter.props(settings, List(fusing.Filter(p)))
// FIXME this way of grabbing the supervisionDecider feels very inefficient
case Map(f, att)
ActorInterpreter.props(settings, List(fusing.Map(f, att.settings(settings).supervisionDecider)))
case Filter(p, att)
ActorInterpreter.props(settings, List(fusing.Filter(p, att.settings(settings).supervisionDecider)))
case Drop(n, _) ActorInterpreter.props(settings, List(fusing.Drop(n)))
case Take(n, _) ActorInterpreter.props(settings, List(fusing.Take(n)))
case Collect(pf, _) ActorInterpreter.props(settings, List(fusing.Collect(pf)))
case Scan(z, f, _) ActorInterpreter.props(settings, List(fusing.Scan(z, f)))
case Collect(pf, att)
ActorInterpreter.props(settings, List(fusing.Collect(att.settings(settings).supervisionDecider)(pf)))
case Scan(z, f, att)
ActorInterpreter.props(settings, List(fusing.Scan(z, f, att.settings(settings).supervisionDecider)))
case Expand(s, f, _) ActorInterpreter.props(settings, List(fusing.Expand(s, f)))
case Conflate(s, f, _) ActorInterpreter.props(settings, List(fusing.Conflate(s, f)))
case Conflate(s, f, att)
ActorInterpreter.props(settings, List(fusing.Conflate(s, f, att.settings(settings).supervisionDecider)))
case Buffer(n, s, _) ActorInterpreter.props(settings, List(fusing.Buffer(n, s)))
case MapConcat(f, _) ActorInterpreter.props(settings, List(fusing.MapConcat(f)))
case MapAsync(f, _) MapAsyncProcessorImpl.props(settings, f)
case MapAsyncUnordered(f, _) MapAsyncUnorderedProcessorImpl.props(settings, f)
case MapConcat(f, att)
ActorInterpreter.props(settings, List(fusing.MapConcat(f, att.settings(settings).supervisionDecider)))
case MapAsync(f, att) MapAsyncProcessorImpl.props(att.settings(settings), f)
case MapAsyncUnordered(f, att) MapAsyncUnorderedProcessorImpl.props(att.settings(settings), f)
// FIXME always amend settings with att.settings(settings)
case Grouped(n, _) ActorInterpreter.props(settings, List(fusing.Grouped(n)))
case GroupBy(f, _) GroupByProcessorImpl.props(settings, f)
case GroupBy(f, att)
GroupByProcessorImpl.props(att.settings(settings), f)
case PrefixAndTail(n, _) PrefixAndTailImpl.props(settings, n)
case SplitWhen(p, _) SplitWhenProcessorImpl.props(settings, p)
case SplitWhen(p, att)
SplitWhenProcessorImpl.props(att.settings(settings), p)
case ConcatAll(_) ConcatAllImpl.props(materializer) //FIXME closes over the materializer, is this good?
case StageFactory(mkStage, _) ActorInterpreter.props(settings, List(mkStage()))
case TimerTransform(mkStage, _) TimerTransformerProcessorsImpl.props(settings, mkStage())

View file

@ -3,9 +3,11 @@
*/
package akka.stream.impl
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.scaladsl.Source
import scala.util.control.NonFatal
import akka.actor.Props
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.Supervision
import akka.stream.scaladsl.Source
/**
* INTERNAL API
@ -13,6 +15,8 @@ import akka.actor.Props
private[akka] object GroupByProcessorImpl {
def props(settings: ActorFlowMaterializerSettings, keyFor: Any Any): Props =
Props(new GroupByProcessorImpl(settings, keyFor))
private case object Drop
}
/**
@ -22,7 +26,9 @@ private[akka] class GroupByProcessorImpl(settings: ActorFlowMaterializerSettings
extends MultiStreamOutputProcessor(settings) {
import MultiStreamOutputProcessor._
import GroupByProcessorImpl.Drop
val decider = settings.supervisionDecider
var keyToSubstreamOutput = collection.mutable.Map.empty[Any, SubstreamOutput]
var pendingSubstreamOutput: SubstreamOutput = _
@ -30,21 +36,33 @@ private[akka] class GroupByProcessorImpl(settings: ActorFlowMaterializerSettings
// No substream is open yet. If downstream cancels now, we are complete
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
nextPhase(openSubstream(elem, key))
tryKeyFor(elem) match {
case Drop
case key nextPhase(openSubstream(elem, key))
}
}
// some substreams are open now. If downstream cancels, we still continue until the substreams are closed
val waitNext = TransferPhase(primaryInputs.NeedsInput) { ()
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
tryKeyFor(elem) match {
case Drop
case key
keyToSubstreamOutput.get(key) match {
case Some(substream) if substream.isOpen nextPhase(dispatchToSubstream(elem, keyToSubstreamOutput(key)))
case None if primaryOutputs.isOpen nextPhase(openSubstream(elem, key))
case _ // stay
}
}
}
private def tryKeyFor(elem: Any): Any =
try keyFor(elem) catch {
case NonFatal(e) if decider(e) != Supervision.Stop
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from groupBy function: {}", elem, e.getMessage)
Drop
}
def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemandOrCancel) { ()
if (primaryOutputs.isClosed) {

View file

@ -13,6 +13,7 @@ import akka.pattern.pipe
import scala.annotation.tailrec
import akka.actor.Props
import akka.actor.DeadLetterSuppression
import akka.stream.Supervision
/**
* INTERNAL API
@ -32,6 +33,7 @@ private[akka] object MapAsyncProcessorImpl {
final case class FutureElement(seqNo: Long, element: Any) extends DeadLetterSuppression
final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression
final case class RecoveredError(in: Any, cause: Throwable)
}
/**
@ -44,6 +46,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
// Execution context for pipeTo and friends
import context.dispatcher
val decider = settings.supervisionDecider
var submittedSeqNo = 0L
var doneSeqNo = 0L
def gap: Long = submittedSeqNo - doneSeqNo
@ -54,7 +57,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
// keep future results arriving too early in a buffer sorted by seqNo
var orderedBuffer = TreeSet.empty[FutureElement]
override def activeReceive = futureReceive orElse super.activeReceive
override def activeReceive = futureReceive.orElse[Any, Unit](super.activeReceive)
def drainBuffer(): List[Any] = {
@ -94,10 +97,10 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
if (!primaryOutputs.demandAvailable) throw new IllegalStateException
if (orderedBuffer.isEmpty) {
primaryOutputs.enqueueOutputElement(element)
emit(element)
} else {
primaryOutputs.enqueueOutputElement(element)
drainBuffer() foreach primaryOutputs.enqueueOutputElement
emit(element)
drainBuffer() foreach emit
}
pump()
} else {
@ -110,6 +113,14 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
fail(cause)
}
def emit(element: Any): Unit = element match {
case RecoveredError(in, err)
if (settings.debugLogging)
log.debug("Dropped element [{}] due to mapAsync future was completed with exception: {}", in, err.getMessage)
case elem
primaryOutputs.enqueueOutputElement(element)
}
override def onError(e: Throwable): Unit = {
// propagate upstream failure immediately
fail(e)
@ -126,16 +137,27 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
nextPhase(completedPhase)
} else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) {
val elem = primaryInputs.dequeueInputElement()
try {
val future = f(elem)
submittedSeqNo += 1
val seqNo = submittedSeqNo
try {
f(elem).map(FutureElement(seqNo, _)).recover {
future.map(FutureElement(seqNo, _)).recover {
case err: Throwable if decider(err) != Supervision.Stop
FutureElement(seqNo, RecoveredError(elem, err))
case err FutureFailure(err)
}.pipeTo(self)
} catch {
case NonFatal(err)
// f threw, propagate failure immediately
// f threw, handle failure immediately
decider(err) match {
case Supervision.Stop
fail(err)
case Supervision.Resume | Supervision.Restart
// submittedSeqNo was not increased, just continue
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from mapAsync factory function: {}", elem, err.getMessage)
}
}
}
}

View file

@ -10,6 +10,7 @@ import akka.stream.ActorFlowMaterializerSettings
import akka.pattern.pipe
import akka.actor.Props
import akka.actor.DeadLetterSuppression
import akka.stream.Supervision
/**
* INTERNAL API
@ -19,7 +20,7 @@ private[akka] object MapAsyncUnorderedProcessorImpl {
Props(new MapAsyncUnorderedProcessorImpl(settings, f))
final case class FutureElement(element: Any) extends DeadLetterSuppression
final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression
final case class FutureFailure(in: Any, cause: Throwable) extends DeadLetterSuppression
}
/**
@ -32,9 +33,10 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali
// Execution context for pipeTo and friends
import context.dispatcher
val decider = settings.supervisionDecider
var inProgressCount = 0
override def activeReceive = futureReceive orElse super.activeReceive
override def activeReceive = futureReceive.orElse[Any, Unit](super.activeReceive)
def futureReceive: Receive = {
case FutureElement(element)
@ -46,8 +48,17 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali
primaryOutputs.enqueueOutputElement(element)
pump()
case FutureFailure(cause)
fail(cause)
case FutureFailure(in, err)
decider(err) match {
case Supervision.Stop
fail(err)
case Supervision.Resume | Supervision.Restart
inProgressCount -= 1
if (settings.debugLogging)
log.debug("Dropped element [{}] due to mapAsyncUnordered future was completed with exception: {}",
in, err.getMessage)
pump()
}
}
override def onError(e: Throwable): Unit = {
@ -66,15 +77,23 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali
nextPhase(completedPhase)
} else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - inProgressCount > 0) {
val elem = primaryInputs.dequeueInputElement()
inProgressCount += 1
try {
f(elem).map(FutureElement.apply).recover {
case err FutureFailure(err)
val future = f(elem)
inProgressCount += 1
future.map(FutureElement.apply).recover {
case err FutureFailure(elem, err)
}.pipeTo(self)
} catch {
case NonFatal(err)
// f threw, propagate failure immediately
decider(err) match {
case Supervision.Stop
fail(err)
case Supervision.Resume | Supervision.Restart
// inProgressCount was not increased, just continue
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from mapAsyncUnordered factory function: {}", elem, err.getMessage)
}
}
}
}

View file

@ -3,9 +3,11 @@
*/
package akka.stream.impl
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.scaladsl.Source
import scala.util.control.NonFatal
import akka.actor.Props
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.Supervision
import akka.stream.scaladsl.Source
/**
* INTERNAL API
@ -13,6 +15,11 @@ import akka.actor.Props
private[akka] object SplitWhenProcessorImpl {
def props(settings: ActorFlowMaterializerSettings, splitPredicate: Any Boolean): Props =
Props(new SplitWhenProcessorImpl(settings, splitPredicate))
private trait SplitDecision
private case object Split extends SplitDecision
private case object Continue extends SplitDecision
private case object Drop extends SplitDecision
}
/**
@ -22,7 +29,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSetti
extends MultiStreamOutputProcessor(_settings) {
import MultiStreamOutputProcessor._
import SplitWhenProcessorImpl._
val decider = settings.supervisionDecider
var currentSubstream: SubstreamOutput = _
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
@ -46,17 +55,31 @@ private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSetti
// Note that this phase is allocated only once per _slice_ and not per element
def serveSubstreamRest(substream: SubstreamOutput) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
if (splitPredicate(elem)) {
decideSplit(elem) match {
case Continue substream.enqueueOutputElement(elem)
case Split
completeSubstreamOutput(currentSubstream.key)
currentSubstream = null
nextPhase(openSubstream(elem))
} else substream.enqueueOutputElement(elem)
case Drop // drop elem and continue
}
}
// Ignore elements for a cancelled substream until a new substream needs to be opened
val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { ()
val elem = primaryInputs.dequeueInputElement()
if (splitPredicate(elem)) nextPhase(openSubstream(elem))
decideSplit(elem) match {
case Continue | Drop // ignore elem
case Split nextPhase(openSubstream(elem))
}
}
private def decideSplit(elem: Any): SplitDecision =
try if (splitPredicate(elem)) Split else Continue catch {
case NonFatal(e) if decider(e) != Supervision.Stop
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from splitWhen function: {}", elem, e.getMessage)
Drop
}
nextPhase(waitFirst)

View file

@ -7,6 +7,7 @@ import scala.annotation.tailrec
import scala.collection.breakOut
import scala.util.control.NonFatal
import akka.stream.stage._
import akka.stream.Supervision
// TODO:
// fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions)
@ -30,6 +31,11 @@ import akka.stream.stage._
private[akka] abstract class BoundaryStage extends AbstractStage[Any, Any, Directive, Directive, BoundaryContext] {
private[fusing] var bctx: BoundaryContext = _
def enter(): BoundaryContext = bctx
final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop
final override def restart(): BoundaryStage =
throw new UnsupportedOperationException("BoundaryStage doesn't support restart")
}
/**
@ -318,6 +324,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
currentOp.allowedToPush = true
super.hold()
}
}
private final val Completing: State = new State {
@ -396,7 +403,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
("----" * (activeOpIndex - jumpBacks(activeOpIndex) - 1))
case Completing padding + "---|"
case Cancelling padding + "|---"
case Failing(e) padding + s"---X ${e.getMessage}"
case Failing(e) padding + s"---X ${e.getMessage} => ${decide(e)}"
case other padding + s"---? $state"
}
println(icon)
@ -410,7 +417,13 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
} catch {
case NonFatal(e) if lastOpFailing != activeOpIndex
lastOpFailing = activeOpIndex
state.fail(e)
decide(e) match {
case Supervision.Stop state.fail(e)
case Supervision.Resume state.pull()
case Supervision.Restart
pipeline(activeOpIndex) = pipeline(activeOpIndex).restart().asInstanceOf[UntypedOp]
state.pull()
}
}
}
@ -425,6 +438,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
}
def decide(e: Throwable): Supervision.Directive =
if (state == Pulling || state.isHolding) Supervision.Stop
else currentOp.decide(e)
/**
* Forks off execution of the pipeline by saving current position, fully executing the effects of the given
* forkState then setting back the position to the saved value.

View file

@ -7,21 +7,26 @@ import scala.collection.immutable
import akka.stream.OverflowStrategy
import akka.stream.impl.FixedSizeBuffer
import akka.stream.stage._
import akka.stream.Supervision
/**
* INTERNAL API
*/
private[akka] final case class Map[In, Out](f: In Out) extends PushStage[In, Out] {
private[akka] final case class Map[In, Out](f: In Out, decider: Supervision.Decider) extends PushStage[In, Out] {
override def onPush(elem: In, ctx: Context[Out]): Directive = ctx.push(f(elem))
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
/**
* INTERNAL API
*/
private[akka] final case class Filter[T](p: T Boolean) extends PushStage[T, T] {
private[akka] final case class Filter[T](p: T Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): Directive =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
private[akka] final object Collect {
@ -31,19 +36,21 @@ private[akka] final object Collect {
final val NotApplied: Any Any = _ Collect.NotApplied
}
private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends PushStage[In, Out] {
private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf: PartialFunction[In, Out]) extends PushStage[In, Out] {
import Collect.NotApplied
override def onPush(elem: In, ctx: Context[Out]): Directive =
pf.applyOrElse(elem, NotApplied) match {
case NotApplied ctx.pull()
case result: Out @unchecked ctx.push(result)
}
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
/**
* INTERNAL API
*/
private[akka] final case class MapConcat[In, Out](f: In immutable.Seq[Out]) extends PushPullStage[In, Out] {
private[akka] final case class MapConcat[In, Out](f: In immutable.Seq[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var currentIterator: Iterator[Out] = Iterator.empty
override def onPush(elem: In, ctx: Context[Out]): Directive = {
@ -59,6 +66,10 @@ private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out])
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
ctx.absorbTermination()
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def restart(): MapConcat[In, Out] = copy()
}
/**
@ -90,7 +101,7 @@ private[akka] final case class Drop[T](count: Int) extends PushStage[T, T] {
/**
* INTERNAL API
*/
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends PushPullStage[In, Out] {
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctx: Context[Out]): Directive = {
@ -104,12 +115,16 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex
else ctx.pull()
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination()
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def restart(): Scan[In, Out] = copy()
}
/**
* INTERNAL API
*/
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends PushPullStage[In, Out] {
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctx: Context[Out]): Directive = {
@ -122,6 +137,10 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex
else ctx.pull()
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination()
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def restart(): Fold[In, Out] = copy()
}
/**
@ -229,11 +248,13 @@ private[akka] final case class Completed[T]() extends PushPullStage[T, T] {
/**
* INTERNAL API
*/
private[akka] final case class Conflate[In, Out](seed: In Out, aggregate: (Out, In) Out) extends DetachedStage[In, Out] {
private[akka] final case class Conflate[In, Out](seed: In Out, aggregate: (Out, In) Out,
decider: Supervision.Decider) extends DetachedStage[In, Out] {
private var agg: Any = null
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
agg = if (agg == null) seed(elem)
agg =
if (agg == null) seed(elem)
else aggregate(agg.asInstanceOf[Out], elem)
if (!ctx.isHolding) ctx.pull()
@ -255,12 +276,17 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
} else if (agg == null) ctx.hold()
else {
val result = agg.asInstanceOf[Out]
if (result == null) throw new NullPointerException
agg = null
ctx.push(result)
}
}
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = ctx.absorbTermination()
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def restart(): Conflate[In, Out] = copy()
}
/**
@ -302,4 +328,9 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol
if (expanded) ctx.finish()
else ctx.absorbTermination()
}
final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop
final override def restart(): Expand[In, Out, Seed] =
throw new UnsupportedOperationException("Expand doesn't support restart")
}

View file

@ -152,6 +152,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* downstream may run in parallel and may complete in any order, but the elements that
* are emitted downstream are in the same order as received from upstream.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#stop]]
* the stream will be completed with failure.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#resume]] or
* [[akka.stream.Supervision#restart]] the element is dropped and the stream continues.
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] =
@ -165,6 +173,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* as soon as it is ready, i.e. it is possible that the elements are not emitted downstream
* in the same order as received from upstream.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#stop]]
* the stream will be completed with failure.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision#resume]] or
* [[akka.stream.Supervision#restart]] the element is dropped and the stream continues.
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T] =
@ -198,6 +214,10 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision#restart]] current value starts at `zero` again
* the stream will continue.
*/
def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Flow[In, T] =
new Flow(delegate.scan(zero)(f.apply))
@ -275,6 +295,9 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* subscriber.
*
* Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]].
* Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure.
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
@ -322,6 +345,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* stop this processor from processing more elements, therefore you must take
* care to unblock (or cancel) all of the produced streams even if you want
* to consume only one of them.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision#stop]] the stream and substreams will be completed
* with failure.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
* the element is dropped and the stream and substreams continue.
*/
def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance]]] =
new Flow(delegate.groupBy(f.apply).map { case (k, p) akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step
@ -338,6 +369,14 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* true, false, // elements go into second substream
* true, false, false // elements go into third substream
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision#stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision#resume]] or [[akka.stream.Supervision#restart]]
* the element is dropped and the stream and substreams continue.
*/
def splitWhen(p: japi.Predicate[Out]): javadsl.Flow[In, Source[Out]] =
new Flow(delegate.splitWhen(p.test).map(_.asJava))

View file

@ -4,6 +4,7 @@
package akka.stream.javadsl
import akka.stream.scaladsl
import akka.stream.Supervision
/**
* Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]]
@ -50,6 +51,14 @@ object OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.dispatcher(dispatcher)
}
/**
* Decides how exceptions from application code are to be handled.
*/
def supervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): OperationAttributes =
new OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.supervisionStrategy(e decider.apply(e))
}
private[akka] val none: OperationAttributes = new OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.none
}

View file

@ -160,6 +160,14 @@ trait FlowOps[+Out] {
* downstream may run in parallel and may complete in any order, but the elements that
* are emitted downstream are in the same order as received from upstream.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Stop]]
* the stream will be completed with failure.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or
* [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues.
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](f: Out Future[T]): Repr[T] =
@ -173,6 +181,14 @@ trait FlowOps[+Out] {
* as soon as it is ready, i.e. it is possible that the elements are not emitted downstream
* in the same order as received from upstream.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Stop]]
* the stream will be completed with failure.
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or
* [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues.
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](f: Out Future[T]): Repr[T] =
@ -203,6 +219,10 @@ trait FlowOps[+Out] {
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*/
def scan[T](zero: T)(f: (T, Out) T): Repr[T] = andThen(Scan(zero, f.asInstanceOf[(Any, Any) Any]))
@ -325,6 +345,9 @@ trait FlowOps[+Out] {
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* subscriber.
*
* Expand does not support [[akka.stream.Supervision.Restart]] and [[akka.stream.Supervision.Resume]].
* Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure.
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
@ -369,6 +392,14 @@ trait FlowOps[+Out] {
* stop this processor from processing more elements, therefore you must take
* care to unblock (or cancel) all of the produced streams even if you want
* to consume only one of them.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the group by function `f` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*/
def groupBy[K, U >: Out](f: Out K): Repr[(K, Source[U])] =
andThen(GroupBy(f.asInstanceOf[Any Any]))
@ -385,6 +416,14 @@ trait FlowOps[+Out] {
* true, false, // elements go into second substream
* true, false, false // elements go into third substream
* }}}
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Stop]] the stream and substreams will be completed
* with failure.
*
* If the split predicate `p` throws an exception and the supervision decision
* is [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]]
* the element is dropped and the stream and substreams continue.
*/
def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U]] =
andThen(SplitWhen(p.asInstanceOf[Any Boolean]))

View file

@ -5,6 +5,7 @@ package akka.stream.scaladsl
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.impl.Ast.AstNode
import akka.stream.Supervision
/**
* Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]]
@ -17,11 +18,10 @@ final case class OperationAttributes private (private val attributes: List[Opera
/**
* Adds given attributes to the end of these attributes.
*/
def and(other: OperationAttributes): OperationAttributes = {
// FIXME should return `this` if other.attributes is empty
// FIXME should return `other` if this is `none`
OperationAttributes(attributes ::: other.attributes)
}
def and(other: OperationAttributes): OperationAttributes =
if (attributes.isEmpty) other
else if (other.attributes.isEmpty) this
else OperationAttributes(attributes ::: other.attributes)
private[akka] def nameLifted: Option[String] =
attributes.collect {
@ -37,6 +37,8 @@ final case class OperationAttributes private (private val attributes: List[Opera
attributes.collect {
case InputBuffer(initial, max) (s: ActorFlowMaterializerSettings) s.withInputBuffer(initial, max)
case Dispatcher(dispatcher) (s: ActorFlowMaterializerSettings) s.withDispatcher(dispatcher)
case SupervisionStrategy(decider) (s: ActorFlowMaterializerSettings)
s.withSupervisionStrategy(decider)
}.reduceOption(_ andThen _).getOrElse(identity) // FIXME is this the optimal way of encoding this?
private[akka] def transform(node: AstNode): AstNode =
@ -62,6 +64,7 @@ object OperationAttributes {
private[OperationAttributes] final case class Name(n: String) extends Attribute
private[OperationAttributes] final case class InputBuffer(initial: Int, max: Int) extends Attribute
private[OperationAttributes] final case class Dispatcher(dispatcher: String) extends Attribute
private[OperationAttributes] final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute
private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes =
apply(List(attribute))
@ -82,4 +85,10 @@ object OperationAttributes {
* Specifies the name of the dispatcher.
*/
def dispatcher(dispatcher: String): OperationAttributes = OperationAttributes(Dispatcher(dispatcher))
/**
* Decides how exceptions from user are to be handled.
*/
def supervisionStrategy(decider: Supervision.Decider): OperationAttributes =
OperationAttributes(SupervisionStrategy(decider))
}

View file

@ -3,6 +3,8 @@
*/
package akka.stream.stage
import akka.stream.Supervision
/**
* General interface for stream transformation.
*
@ -85,6 +87,25 @@ private[stream] abstract class AbstractStage[-In, Out, PushD <: Directive, PullD
*/
def onUpstreamFailure(cause: Throwable, ctx: Ctx): TerminationDirective = ctx.fail(cause)
/**
* If an exception is thrown from [[#onPush]] this method is invoked to decide how
* to handle the exception. By default this method returns [[Supervision.Stop]].
*
* If an exception is thrown from [[#onPull]] the stream will always be completed with
* failure, because it is not always possible to recover from that state.
* In concrete stages it is of course possible to use ordinary try-catch-recover inside
* `onPull` when it is know how to recover from such exceptions.
*
*/
def decide(t: Throwable): Supervision.Directive = Supervision.Stop
/**
* Used to create a fresh instance of the stage after an error resulting in a [[Supervision.Restart]]
* directive. By default it will return the same instance untouched, so you must override it
* if there are any state that should be cleared before restarting, e.g. by returning a new instance.
*/
def restart(): Stage[In, Out] = this
}
/**
@ -164,7 +185,20 @@ abstract class PushStage[In, Out] extends PushPullStage[In, Out] {
*
* @see [[PushPullStage]]
*/
abstract class DetachedStage[In, Out] extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]]
abstract class DetachedStage[In, Out] extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]] {
/**
* If an exception is thrown from [[#onPush]] this method is invoked to decide how
* to handle the exception. By default this method returns [[Supervision.Stop]].
*
* If an exception is thrown from [[#onPull]] or if the stage is holding state the stream
* will always be completed with failure, because it is not always possible to recover from
* that state.
* In concrete stages it is of course possible to use ordinary try-catch-recover inside
* `onPull` when it is know how to recover from such exceptions.
*/
override def decide(t: Throwable): Supervision.Directive = super.decide(t)
}
/**
* The behavior of [[StatefulStage]] is defined by these two methods, which