Merge pull request #18820 from agolubev/agolubev-#18037_excpetion_from_FlowOps.log

=str #18037 FlowOps.log exception handler does not follow strategy
This commit is contained in:
drewhk 2015-11-03 12:29:12 +01:00
commit 38d94d7cbe
4 changed files with 17 additions and 6 deletions

View file

@ -4,12 +4,16 @@
package akka.stream.scaladsl
import akka.event.{ DummyClassForStringSources, Logging }
import akka.stream.ActorAttributes._
import akka.stream.Attributes.LogLevels
import akka.stream.Supervision._
import akka.stream.testkit.{ AkkaSpec, ScriptedTest }
import akka.stream.javadsl
import akka.stream.{ ActorMaterializer, Materializer, Attributes }
import akka.testkit.TestProbe
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.util.control.NoStackTrace
class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest {
@ -141,6 +145,13 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest {
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-6e] Upstream failed, cause: FlowLogSpec$TestException: Boom!"))
}
"follow supervision strategy when exception thrown" in {
val ex = new RuntimeException() with NoStackTrace
val future = Source(1 to 5).log("hi", n throw ex)
.withAttributes(supervisionStrategy(resumingDecider)).runWith(Sink.fold(0)(_ + _))
Await.result(future, 500.millis) shouldEqual 0
}
}
"on javadsl.Source" must {

View file

@ -8,14 +8,11 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.actor._
import akka.dispatch.Dispatchers
import akka.pattern.ask
import akka.stream.actor.ActorSubscriber
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule }
import akka.stream.impl.io.SslTlsCipherActor
import akka.stream._
import akka.stream.io.SslTls.TlsModule
import akka.stream.stage.Stage
import akka.stream.Attributes._
import org.reactivestreams._
import scala.concurrent.duration.FiniteDuration

View file

@ -140,7 +140,7 @@ private[stream] object Stages {
}
final case class Log[T](name: String, extract: T Any, loggingAdapter: Option[LoggingAdapter], attributes: Attributes = log) extends SymbolicStage[T, T] {
override def create(attr: Attributes): Stage[T, T] = fusing.Log(name, extract, loggingAdapter)
override def create(attr: Attributes): Stage[T, T] = fusing.Log(name, extract, loggingAdapter, supervision(attr))
}
final case class Filter[T](p: T Boolean, attributes: Attributes = filter) extends SymbolicStage[T, T] {

View file

@ -686,7 +686,9 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
/**
* INTERNAL API
*/
private[akka] final case class Log[T](name: String, extract: T Any, logAdapter: Option[LoggingAdapter]) extends PushStage[T, T] {
private[akka] final case class Log[T](name: String, extract: T Any,
logAdapter: Option[LoggingAdapter],
decider: Supervision.Decider) extends PushStage[T, T] {
import Log._
@ -744,6 +746,7 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
/**