diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala index f6d9f0b06f..871822ac1d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala @@ -4,10 +4,10 @@ package akka.stream.scaladsl import akka.event.{ DummyClassForStringSources, Logging } -import akka.stream._ -import akka.stream.OperationAttributes import akka.stream.OperationAttributes.LogLevels import akka.stream.testkit.{ AkkaSpec, ScriptedTest } +import akka.stream.javadsl +import akka.stream.{ ActorFlowMaterializer, FlowMaterializer, OperationAttributes } import akka.testkit.TestProbe import scala.util.control.NoStackTrace @@ -24,8 +24,9 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest { "A Log" must { - val LogSrc = s"akka.stream.Log(akka://${Logging.simpleName(classOf[FlowLogSpec])})" - val LogClazz = classOf[DummyClassForStringSources] + val supervisorPath = "akka://FlowLogSpec/user/$a" + val LogSrc = s"akka.stream.Log($supervisorPath)" + val LogClazz = classOf[FlowMaterializer] "on Flow" must { diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 2c3aef362b..12807046e2 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -157,6 +157,9 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { */ private[akka] def system: ActorSystem + /** INTERNAL API */ + private[akka] def supervisor: ActorRef + } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 21772f5dc2..6f1ab826bf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -4,22 +4,17 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel -import akka.event.Logging -import akka.event.LoggingAdapter +import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.OperationAttributes.LogLevels -import akka.stream.impl.FixedSizeBuffer -import akka.stream.impl.ReactiveStreamsCompliance +import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ -import akka.stream.Supervision -import akka.stream._ +import akka.stream.{ Supervision, _ } import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.Future import scala.util.control.NonFatal -import scala.util.Failure -import scala.util.Success -import scala.util.Try +import scala.util.{ Failure, Success, Try } /** * INTERNAL API @@ -540,7 +535,15 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels) log = logAdapter match { case Some(l) ⇒ l - case _ ⇒ Logging(ctx.materializer.asInstanceOf[ActorFlowMaterializer].system, DefaultLoggerName) + case _ ⇒ + val mat = try ActorFlowMaterializer.downcast(ctx.materializer) + catch { + case ex: Exception ⇒ + throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorFlowMaterializer! " + + "Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex) + } + + Logging(mat.system, ctx)(fromLifecycleContext) } } @@ -583,8 +586,26 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt * INTERNAL API */ private[akka] object Log { - private final val DefaultLoggerName = "akka.stream.Log" + /** + * Must be located here to be visible for implicit resolution, when LifecycleContext is passed to [[Logging]] + * More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source. + */ + final val fromLifecycleContext = new LogSource[LifecycleContext] { + + // do not expose private context classes (of OneBoundedInterpreter) + override def getClazz(t: LifecycleContext): Class[_] = classOf[FlowMaterializer] + + override def genString(t: LifecycleContext): String = { + try s"$DefaultLoggerName(${ActorFlowMaterializer.downcast(t.materializer).supervisor.path})" + catch { + case ex: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName) + } + } + + } + + private final val DefaultLoggerName = "akka.stream.Log" private final val OffInt = LogLevels.Off.asInt private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 2e58a33ca0..29cce67dbf 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -3,7 +3,8 @@ */ package akka.stream.stage -import akka.stream.{ FlowMaterializer, OperationAttributes, Supervision } +import akka.event.{ Logging, LogSource } +import akka.stream.{ ActorFlowMaterializer, FlowMaterializer, OperationAttributes, Supervision } /** * General interface for stream transformation.