diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala index 02dad695a0..fbddb1d988 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala @@ -4,12 +4,16 @@ package akka.stream.scaladsl import akka.event.{ DummyClassForStringSources, Logging } +import akka.stream.ActorAttributes._ import akka.stream.Attributes.LogLevels +import akka.stream.Supervision._ import akka.stream.testkit.{ AkkaSpec, ScriptedTest } import akka.stream.javadsl import akka.stream.{ ActorMaterializer, Materializer, Attributes } import akka.testkit.TestProbe +import scala.concurrent.duration._ +import scala.concurrent.Await import scala.util.control.NoStackTrace class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest { @@ -141,6 +145,13 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest { .runWith(Sink.ignore) logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-6e] Upstream failed, cause: FlowLogSpec$TestException: Boom!")) } + + "follow supervision strategy when exception thrown" in { + val ex = new RuntimeException() with NoStackTrace + val future = Source(1 to 5).log("hi", n ⇒ throw ex) + .withAttributes(supervisionStrategy(resumingDecider)).runWith(Sink.fold(0)(_ + _)) + Await.result(future, 500.millis) shouldEqual 0 + } } "on javadsl.Source" must { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 6362def624..539d0ab787 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -8,14 +8,11 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } import akka.actor._ import akka.dispatch.Dispatchers import akka.pattern.ask -import akka.stream.actor.ActorSubscriber +import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule } import akka.stream.impl.io.SslTlsCipherActor -import akka.stream._ import akka.stream.io.SslTls.TlsModule -import akka.stream.stage.Stage -import akka.stream.Attributes._ import org.reactivestreams._ import scala.concurrent.duration.FiniteDuration diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 2640e4d954..12fe659fbf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -140,7 +140,7 @@ private[stream] object Stages { } final case class Log[T](name: String, extract: T ⇒ Any, loggingAdapter: Option[LoggingAdapter], attributes: Attributes = log) extends SymbolicStage[T, T] { - override def create(attr: Attributes): Stage[T, T] = fusing.Log(name, extract, loggingAdapter) + override def create(attr: Attributes): Stage[T, T] = fusing.Log(name, extract, loggingAdapter, supervision(attr)) } final case class Filter[T](p: T ⇒ Boolean, attributes: Attributes = filter) extends SymbolicStage[T, T] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 9e1ce386c3..9fcff50e7a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -686,7 +686,9 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I /** * INTERNAL API */ -private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapter: Option[LoggingAdapter]) extends PushStage[T, T] { +private[akka] final case class Log[T](name: String, extract: T ⇒ Any, + logAdapter: Option[LoggingAdapter], + decider: Supervision.Decider) extends PushStage[T, T] { import Log._ @@ -744,6 +746,7 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt + override def decide(t: Throwable): Supervision.Directive = decider(t) } /**