Behaviors.logMessages implementation #26226
This commit is contained in:
parent
e0debaa9c4
commit
75be2c40c9
6 changed files with 247 additions and 3 deletions
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.actor.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.event.Logging;
|
||||
import akka.japi.pf.PFBuilder;
|
||||
|
|
@ -72,4 +73,22 @@ public class ActorLoggingTest extends JUnitSuite {
|
|||
testKit.spawn(behavior);
|
||||
eventFilter.awaitDone(FiniteDuration.create(3, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void logMessagesBehavior() {
|
||||
Behavior<String> behavior = Behaviors.logMessages(Behaviors.empty());
|
||||
|
||||
CustomEventFilter eventFilter =
|
||||
new CustomEventFilter(
|
||||
new PFBuilder<Logging.LogEvent, Object>()
|
||||
.match(
|
||||
Logging.LogEvent.class,
|
||||
(event) -> event.message().equals("received message Hello"))
|
||||
.build(),
|
||||
1);
|
||||
|
||||
ActorRef<String> ref = testKit.spawn(behavior);
|
||||
ref.tell("Hello");
|
||||
eventFilter.awaitDone(FiniteDuration.create(3, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed
|
||||
|
||||
import akka.actor
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.event.Logging
|
||||
import akka.testkit.EventFilter
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
class LogMessagesSpec extends ScalaTestWithActorTestKit("""
|
||||
akka.loglevel = DEBUG # test verifies debug
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
""") with WordSpecLike {
|
||||
|
||||
implicit val untyped: actor.ActorSystem = system.toUntyped
|
||||
|
||||
"The log messages behavior" should {
|
||||
"log signals" in {
|
||||
val behavior: Behavior[Signal] = Behaviors.logMessages(Behaviors.empty)
|
||||
|
||||
val ref: ActorRef[Signal] = spawn(behavior)
|
||||
|
||||
EventFilter.debug("received signal PostStop", source = ref.path.toString, occurrences = 1).intercept {
|
||||
testKit.stop(ref)
|
||||
}
|
||||
}
|
||||
|
||||
"log messages" in {
|
||||
val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.empty)
|
||||
|
||||
val ref: ActorRef[String] = spawn(behavior)
|
||||
|
||||
EventFilter.debug("received message Hello", source = ref.path.toString, occurrences = 1).intercept {
|
||||
ref ! "Hello"
|
||||
}
|
||||
}
|
||||
|
||||
"log messages with provided log level" in {
|
||||
val opts = LogOptions().withLevel(Logging.InfoLevel)
|
||||
val behavior: Behavior[String] = Behaviors.logMessages(opts, Behaviors.empty)
|
||||
|
||||
val ref: ActorRef[String] = spawn(behavior)
|
||||
|
||||
EventFilter.info("received message Hello", source = ref.path.toString, occurrences = 1).intercept {
|
||||
ref ! "Hello"
|
||||
}
|
||||
}
|
||||
|
||||
"log messages with provided logger" in {
|
||||
val logger = system.log
|
||||
val opts = LogOptions().withLogger(logger)
|
||||
val behavior: Behavior[String] = Behaviors.logMessages(opts, Behaviors.empty)
|
||||
|
||||
val ref: ActorRef[String] = spawn(behavior)
|
||||
|
||||
EventFilter.debug("received message Hello", source = "LogMessagesSpec", occurrences = 1).intercept {
|
||||
ref ! "Hello"
|
||||
}
|
||||
}
|
||||
|
||||
"not log messages when not enabled" in {
|
||||
val opts = LogOptions().withEnabled(false)
|
||||
val behavior: Behavior[String] = Behaviors.logMessages(opts, Behaviors.empty)
|
||||
|
||||
val ref: ActorRef[String] = spawn(behavior)
|
||||
|
||||
EventFilter.debug("received message Hello", source = ref.path.toString, occurrences = 0).intercept {
|
||||
ref ! "Hello"
|
||||
}
|
||||
}
|
||||
|
||||
"log messages with decorated MDC values" in {
|
||||
val behavior = Behaviors.withMdc[String](Map("mdc" -> true))(Behaviors.logMessages(Behaviors.empty))
|
||||
|
||||
val ref = spawn(behavior)
|
||||
EventFilter.custom({
|
||||
case logEvent if logEvent.level == Logging.DebugLevel ⇒
|
||||
logEvent.message should ===("received message Hello")
|
||||
logEvent.mdc should ===(Map("mdc" -> true))
|
||||
true
|
||||
case other ⇒ system.log.error(s"Unexpected log event: {}", other); false
|
||||
}, occurrences = 1).intercept {
|
||||
ref ! "Hello"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4,7 +4,10 @@
|
|||
|
||||
package akka.actor.typed
|
||||
|
||||
import java.util.Optional
|
||||
|
||||
import akka.annotation.{ DoNotInherit, InternalApi }
|
||||
import akka.event.Logging
|
||||
import akka.event.Logging._
|
||||
|
||||
/**
|
||||
|
|
@ -41,6 +44,75 @@ object LogMarker {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Logging options when using `Behaviors.logMessages`.
|
||||
*/
|
||||
@DoNotInherit
|
||||
abstract sealed class LogOptions {
|
||||
/**
|
||||
* User control whether messages are logged or not. This is useful when you want to have an application configuration
|
||||
* to control when to log messages.
|
||||
*/
|
||||
def withEnabled(enabled: Boolean): LogOptions
|
||||
|
||||
/**
|
||||
* The [[akka.event.Logging.LogLevel]] to use when logging messages.
|
||||
*/
|
||||
def withLevel(level: LogLevel): LogOptions
|
||||
|
||||
/**
|
||||
* A [[akka.actor.typed.Logger]] to use when logging messages.
|
||||
*/
|
||||
def withLogger(logger: Logger): LogOptions
|
||||
|
||||
def enabled: Boolean
|
||||
def level: LogLevel
|
||||
def logger: Option[Logger]
|
||||
/** Java API */
|
||||
def getLogger: Optional[Logger]
|
||||
}
|
||||
|
||||
/**
|
||||
* Factories for log options
|
||||
*/
|
||||
object LogOptions {
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] final case class LogOptionsImpl(enabled: Boolean, level: LogLevel, logger: Option[Logger])
|
||||
extends LogOptions {
|
||||
/**
|
||||
* User control whether messages are logged or not. This is useful when you want to have an application configuration
|
||||
* to control when to log messages.
|
||||
*/
|
||||
override def withEnabled(enabled: Boolean): LogOptions = this.copy(enabled = enabled)
|
||||
|
||||
/**
|
||||
* The [[akka.event.Logging.LogLevel]] to use when logging messages.
|
||||
*/
|
||||
override def withLevel(level: LogLevel): LogOptions = this.copy(level = level)
|
||||
|
||||
/**
|
||||
* A [[akka.actor.typed.Logger]] to use when logging messages.
|
||||
*/
|
||||
override def withLogger(logger: Logger): LogOptions = this.copy(logger = Option(logger))
|
||||
|
||||
/** Java API */
|
||||
override def getLogger: Optional[Logger] = Optional.ofNullable(logger.orNull)
|
||||
}
|
||||
|
||||
/**
|
||||
* Scala API: Create a new log options with defaults.
|
||||
*/
|
||||
def apply(): LogOptions = LogOptionsImpl(enabled = true, Logging.DebugLevel, None)
|
||||
|
||||
/**
|
||||
* Java API: Create a new log options.
|
||||
*/
|
||||
def create(): LogOptions = apply()
|
||||
}
|
||||
|
||||
/**
|
||||
* Logging API provided inside of actors through the actor context.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -6,8 +6,9 @@ package akka.actor.typed.internal
|
|||
|
||||
import akka.actor.typed
|
||||
import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior }
|
||||
import akka.actor.typed.LogOptions.LogOptionsImpl
|
||||
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
|
||||
import akka.actor.typed.{ ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal, TypedActorContext }
|
||||
import akka.actor.typed.{ LogOptions, _ }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.LineNumbers
|
||||
|
||||
|
|
@ -126,6 +127,35 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Log all messages for this decorated ReceiveTarget[T] to logger before receiving it ourselves.
|
||||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) extends BehaviorInterceptor[T, T] {
|
||||
|
||||
import BehaviorInterceptor._
|
||||
|
||||
override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
if (opts.enabled)
|
||||
opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received message {}", msg)
|
||||
target(ctx, msg)
|
||||
}
|
||||
|
||||
override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
if (opts.enabled)
|
||||
opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received signal {}", signal)
|
||||
target(ctx, signal)
|
||||
}
|
||||
|
||||
// only once in the same behavior stack
|
||||
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match {
|
||||
case LogMessagesInterceptor(`opts`) ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -7,9 +7,8 @@ package akka.actor.typed.javadsl
|
|||
import java.util.Collections
|
||||
import java.util.function.{ Function ⇒ JFunction }
|
||||
|
||||
import akka.actor.typed.{ ActorRef, Behavior, BehaviorInterceptor, Signal, SupervisorStrategy }
|
||||
import akka.actor.typed._
|
||||
import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl, WithMdcBehaviorInterceptor }
|
||||
import akka.actor.typed.scaladsl
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.japi.function.{ Function2 ⇒ JapiFunction2 }
|
||||
import akka.japi.pf.PFBuilder
|
||||
|
|
@ -185,6 +184,22 @@ object Behaviors {
|
|||
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
|
||||
scaladsl.Behaviors.monitor(monitor, behavior)
|
||||
|
||||
/**
|
||||
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
|
||||
* [[akka.actor.typed.LogOptions]] default configuration before invoking the wrapped behavior.
|
||||
* To include an MDC context then first wrap `logMessages` with `withMDC`.
|
||||
*/
|
||||
def logMessages[T](behavior: Behavior[T]): Behavior[T] =
|
||||
scaladsl.Behaviors.logMessages(behavior)
|
||||
|
||||
/**
|
||||
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
|
||||
* [[akka.actor.typed.LogOptions]] configuration before invoking the wrapped behavior.
|
||||
* To include an MDC context then first wrap `logMessages` with `withMDC`.
|
||||
*/
|
||||
def logMessages[T](logOptions: LogOptions, behavior: Behavior[T]): Behavior[T] =
|
||||
scaladsl.Behaviors.logMessages(logOptions, behavior)
|
||||
|
||||
/**
|
||||
* Wrap the given behavior such that it is restarted (i.e. reset to its
|
||||
* initial state) whenever it throws an exception of the given class or a
|
||||
|
|
|
|||
|
|
@ -157,6 +157,22 @@ object Behaviors {
|
|||
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
|
||||
BehaviorImpl.intercept(new MonitorInterceptor[T](monitor))(behavior)
|
||||
|
||||
/**
|
||||
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
|
||||
* [[akka.actor.typed.LogOptions]] default configuration before invoking the wrapped behavior.
|
||||
* To include an MDC context then first wrap `logMessages` with `withMDC`.
|
||||
*/
|
||||
def logMessages[T](behavior: Behavior[T]): Behavior[T] =
|
||||
BehaviorImpl.intercept(new LogMessagesInterceptor[T](LogOptions()))(behavior)
|
||||
|
||||
/**
|
||||
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
|
||||
* [[akka.actor.typed.LogOptions]] configuration before invoking the wrapped behavior.
|
||||
* To include an MDC context then first wrap `logMessages` with `withMDC`.
|
||||
*/
|
||||
def logMessages[T](logOptions: LogOptions, behavior: Behavior[T]): Behavior[T] =
|
||||
BehaviorImpl.intercept(new LogMessagesInterceptor[T](logOptions))(behavior)
|
||||
|
||||
/**
|
||||
* Wrap the given behavior with the given [[SupervisorStrategy]] for
|
||||
* the given exception.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue