+str #17298 include stream supervisor in log() source

This commit is contained in:
Konrad Malawski 2015-05-27 00:27:05 +02:00
parent 454a393af1
commit 778e6ce3d2
4 changed files with 42 additions and 16 deletions

View file

@ -4,10 +4,10 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.event.{ DummyClassForStringSources, Logging } import akka.event.{ DummyClassForStringSources, Logging }
import akka.stream._
import akka.stream.OperationAttributes
import akka.stream.OperationAttributes.LogLevels import akka.stream.OperationAttributes.LogLevels
import akka.stream.testkit.{ AkkaSpec, ScriptedTest } import akka.stream.testkit.{ AkkaSpec, ScriptedTest }
import akka.stream.javadsl
import akka.stream.{ ActorFlowMaterializer, FlowMaterializer, OperationAttributes }
import akka.testkit.TestProbe import akka.testkit.TestProbe
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -24,8 +24,9 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest {
"A Log" must { "A Log" must {
val LogSrc = s"akka.stream.Log(akka://${Logging.simpleName(classOf[FlowLogSpec])})" val supervisorPath = "akka://FlowLogSpec/user/$a"
val LogClazz = classOf[DummyClassForStringSources] val LogSrc = s"akka.stream.Log($supervisorPath)"
val LogClazz = classOf[FlowMaterializer]
"on Flow" must { "on Flow" must {

View file

@ -157,6 +157,9 @@ abstract class ActorFlowMaterializer extends FlowMaterializer {
*/ */
private[akka] def system: ActorSystem private[akka] def system: ActorSystem
/** INTERNAL API */
private[akka] def supervisor: ActorRef
} }
/** /**

View file

@ -4,22 +4,17 @@
package akka.stream.impl.fusing package akka.stream.impl.fusing
import akka.event.Logging.LogLevel import akka.event.Logging.LogLevel
import akka.event.Logging import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.event.LoggingAdapter
import akka.stream.OperationAttributes.LogLevels import akka.stream.OperationAttributes.LogLevels
import akka.stream.impl.FixedSizeBuffer import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance }
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.Supervision import akka.stream.{ Supervision, _ }
import akka.stream._
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.Failure import scala.util.{ Failure, Success, Try }
import scala.util.Success
import scala.util.Try
/** /**
* INTERNAL API * INTERNAL API
@ -540,7 +535,15 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels) logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels)
log = logAdapter match { log = logAdapter match {
case Some(l) l case Some(l) l
case _ Logging(ctx.materializer.asInstanceOf[ActorFlowMaterializer].system, DefaultLoggerName) case _
val mat = try ActorFlowMaterializer.downcast(ctx.materializer)
catch {
case ex: Exception
throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorFlowMaterializer! " +
"Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex)
}
Logging(mat.system, ctx)(fromLifecycleContext)
} }
} }
@ -583,8 +586,26 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
* INTERNAL API * INTERNAL API
*/ */
private[akka] object Log { private[akka] object Log {
private final val DefaultLoggerName = "akka.stream.Log"
/**
* Must be located here to be visible for implicit resolution, when LifecycleContext is passed to [[Logging]]
* More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source.
*/
final val fromLifecycleContext = new LogSource[LifecycleContext] {
// do not expose private context classes (of OneBoundedInterpreter)
override def getClazz(t: LifecycleContext): Class[_] = classOf[FlowMaterializer]
override def genString(t: LifecycleContext): String = {
try s"$DefaultLoggerName(${ActorFlowMaterializer.downcast(t.materializer).supervisor.path})"
catch {
case ex: Exception LogSource.fromString.genString(DefaultLoggerName)
}
}
}
private final val DefaultLoggerName = "akka.stream.Log"
private final val OffInt = LogLevels.Off.asInt private final val OffInt = LogLevels.Off.asInt
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
} }

View file

@ -3,7 +3,8 @@
*/ */
package akka.stream.stage package akka.stream.stage
import akka.stream.{ FlowMaterializer, OperationAttributes, Supervision } import akka.event.{ Logging, LogSource }
import akka.stream.{ ActorFlowMaterializer, FlowMaterializer, OperationAttributes, Supervision }
/** /**
* General interface for stream transformation. * General interface for stream transformation.