+slf,act #21671 allow using Markers with LoggingAdapter (#21707)

* +slf,act #21671 allow using Markers with LoggingAdapter

* +slf,act #21671 allow using Markers with LoggingAdapter

* improve compatibility

* add docs

* address review comments

* actually print the marker
This commit is contained in:
Konrad Malawski 2016-10-27 15:07:59 +02:00 committed by GitHub
parent 650abe19d6
commit 3951cf4e68
13 changed files with 1420 additions and 80 deletions

View file

@ -5,11 +5,13 @@ package akka.event
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import java.util.logging
import akka.actor.ActorSystem.Settings
import akka.actor._
import akka.dispatch.RequiresMessageQueue
import akka.util.ReentrantGuard
import akka.event.Logging.{ Extension _, _ }
import akka.util.{ OptionVal, ReentrantGuard }
import akka.util.Helpers.toRootLowerCase
import akka.{ AkkaException, ConfigurationException }
@ -379,14 +381,12 @@ object Logging {
/**
* Returns a 'safe' getSimpleName for the provided object's Class
* @param obj
* @return the simple name of the given object's Class
*/
def simpleName(obj: AnyRef): String = simpleName(obj.getClass)
/**
* Returns a 'safe' getSimpleName for the provided Class
* @param clazz
* @return the simple name of the given Class
*/
def simpleName(clazz: Class[_]): String = {
@ -511,6 +511,30 @@ object Logging {
val (str, clazz) = LogSource(logSource, system)
new BusLogging(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter)
}
/**
* Obtain LoggingAdapter with additional "marker" support (which some logging frameworks are able to utilise)
* for the given actor system and source object. This will use the systems event stream and include the systems
* address in the log source string.
*
* <b>Do not use this if you want to supply a log category string (like
* com.example.app.whatever) unaltered,</b> supply `system.eventStream` in this
* case or use
*
* {{{
* Logging(system, this.getClass)
* }}}
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*
* You can add your own rules quite easily, see [[akka.event.LogSource]].
*/
def withMarker[T: LogSource](system: ActorSystem, logSource: T): MarkerLoggingAdapter = {
val (str, clazz) = LogSource(logSource, system)
new MarkerLoggingAdapter(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter)
}
/**
* Obtain LoggingAdapter for the given logging bus and source object.
@ -530,6 +554,24 @@ object Logging {
val (str, clazz) = LogSource(logSource)
new BusLogging(bus, str, clazz)
}
/**
* Obtain LoggingAdapter for the given logging bus and source object.
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*
* You can add your own rules quite easily, see [[akka.event.LogSource]].
*
* Note that this `LoggingAdapter` will use the [[akka.event.DefaultLoggingFilter]],
* and not the [[akka.event.LoggingFilter]] configured for the system
* (if different from `DefaultLoggingFilter`).
*/
def withMarker[T: LogSource](bus: LoggingBus, logSource: T): MarkerLoggingAdapter = {
val (str, clazz) = LogSource(logSource)
new MarkerLoggingAdapter(bus, str, clazz)
}
/**
* Obtain LoggingAdapter with MDC support for the given actor.
@ -540,6 +582,15 @@ object Logging {
val system = logSource.context.system.asInstanceOf[ExtendedActorSystem]
new BusLogging(system.eventStream, str, clazz, system.logFilter) with DiagnosticLoggingAdapter
}
/**
* Obtain LoggingAdapter with marker and MDC support for the given actor.
* Don't use it outside its specific Actor as it isn't thread safe
*/
def withMarker(logSource: Actor): DiagnosticMarkerBusLoggingAdapter = {
val (str, clazz) = LogSource(logSource)
val system = logSource.context.system.asInstanceOf[ExtendedActorSystem]
new DiagnosticMarkerBusLoggingAdapter(system.eventStream, str, clazz, system.logFilter)
}
/**
* Obtain LoggingAdapter for the given actor system and source object. This
@ -674,6 +725,14 @@ object Logging {
case InfoLevel Info(logSource, logClass, message, mdc)
case DebugLevel Debug(logSource, logClass, message, mdc)
}
def apply(level: LogLevel, logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker): LogEvent = level match {
case ErrorLevel Error(logSource, logClass, message, mdc, marker)
case WarningLevel Warning(logSource, logClass, message, mdc, marker)
case InfoLevel Info(logSource, logClass, message, mdc, marker)
case DebugLevel Debug(logSource, logClass, message, mdc, marker)
}
}
/**
@ -686,11 +745,26 @@ object Logging {
class Error2(cause: Throwable, logSource: String, logClass: Class[_], message: Any = "", override val mdc: MDC) extends Error(cause, logSource, logClass, message) {
def this(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = this(Error.NoCause, logSource, logClass, message, mdc)
}
class Error3(cause: Throwable, logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker)
extends Error2(cause, logSource, logClass, message, mdc) with LogEventWithMarker {
def this(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = this(Error.NoCause, logSource, logClass, message, mdc, marker)
}
object Error {
def apply(logSource: String, logClass: Class[_], message: Any) = new Error(NoCause, logSource, logClass, message)
def apply(cause: Throwable, logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Error2(cause, logSource, logClass, message, mdc)
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Error2(NoCause, logSource, logClass, message, mdc)
def apply(logSource: String, logClass: Class[_], message: Any) =
new Error(NoCause, logSource, logClass, message)
def apply(logSource: String, logClass: Class[_], message: Any, marker: LogMarker) =
new Error3(NoCause, logSource, logClass, message, Map.empty, marker)
def apply(cause: Throwable, logSource: String, logClass: Class[_], message: Any, mdc: MDC) =
new Error2(cause, logSource, logClass, message, mdc)
def apply(cause: Throwable, logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) =
new Error3(cause, logSource, logClass, message, mdc, marker)
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) =
new Error2(NoCause, logSource, logClass, message, mdc)
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) =
new Error3(NoCause, logSource, logClass, message, mdc, marker)
/** Null Object used for errors without cause Throwable */
object NoCause extends NoStackTrace
@ -704,8 +778,11 @@ object Logging {
override def level = WarningLevel
}
class Warning2(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC) extends Warning(logSource, logClass, message)
class Warning3(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker)
extends Warning2(logSource, logClass, message, mdc) with LogEventWithMarker
object Warning {
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Warning2(logSource, logClass, message, mdc)
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = new Warning3(logSource, logClass, message, mdc, marker)
}
/**
@ -715,8 +792,11 @@ object Logging {
override def level = InfoLevel
}
class Info2(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC) extends Info(logSource, logClass, message)
class Info3(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker)
extends Info2(logSource, logClass, message, mdc) with LogEventWithMarker
object Info {
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Info2(logSource, logClass, message, mdc)
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = new Info3(logSource, logClass, message, mdc, marker)
}
/**
@ -726,8 +806,21 @@ object Logging {
override def level = DebugLevel
}
class Debug2(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC) extends Debug(logSource, logClass, message)
class Debug3(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker)
extends Debug2(logSource, logClass, message, mdc) with LogEventWithMarker
object Debug {
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Debug2(logSource, logClass, message, mdc)
def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = new Debug3(logSource, logClass, message, mdc, marker)
}
/** INTERNAL API, Marker interface for LogEvents containing Markers, which can be set for example on an slf4j logger */
sealed trait LogEventWithMarker extends LogEvent {
def marker: LogMarker
/** Appends the marker to the Debug/Info/Warning/Error toString representations */
override def toString = {
val s = super.toString
s.substring(0, s.length - 1) + "," + marker + ")"
}
}
/**
@ -766,16 +859,22 @@ object Logging {
class LoggerInitializationException(msg: String) extends AkkaException(msg)
trait StdOutLogger {
import StdOutLogger._
import java.text.SimpleDateFormat
import java.util.Date
private val date = new Date()
private val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.SSS")
private val errorFormat = "[ERROR] [%s] [%s] [%s] %s%s"
private val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s"
private val warningFormat = "[WARN] [%s] [%s] [%s] %s"
private val infoFormat = "[INFO] [%s] [%s] [%s] %s"
private val debugFormat = "[DEBUG] [%s] [%s] [%s] %s"
// format: OFF
// FIXME: remove those when we have the chance to break binary compatibility
private val errorFormat = ErrorFormat
private val errorFormatWithoutCause = ErrorFormatWithoutCause
private val warningFormat = WarningFormat
private val infoFormat = InfoFormat
private val debugFormat = DebugFormat
// format: ON
def timestamp(event: LogEvent): String = synchronized {
date.setTime(event.timestamp)
@ -790,36 +889,92 @@ object Logging {
case e warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e))
}
def error(event: Error): Unit = {
val f = if (event.cause == Error.NoCause) errorFormatWithoutCause else errorFormat
println(f.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message,
stackTraceFor(event.cause)))
def error(event: Error): Unit = event match {
case e: Error3 // has marker
val f = if (event.cause == Error.NoCause) ErrorWithoutCauseWithMarkerFormat else ErrorFormatWithMarker
println(f.format(
e.marker.name,
timestamp(event),
event.thread.getName,
event.logSource,
event.message,
stackTraceFor(event.cause)))
case _
val f = if (event.cause == Error.NoCause) ErrorFormatWithoutCause else ErrorFormat
println(f.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message,
stackTraceFor(event.cause)))
}
def warning(event: Warning): Unit =
println(warningFormat.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
def warning(event: Warning): Unit = event match {
case e: Warning3 // has marker
println(WarningWithMarkerFormat.format(
e.marker.name,
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
case _
println(WarningFormat.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
}
def info(event: Info): Unit =
println(infoFormat.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
def info(event: Info): Unit = event match {
case e: Info3 // has marker
println(InfoWithMarkerFormat.format(
e.marker.name,
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
case _
println(InfoFormat.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
}
def debug(event: Debug): Unit =
println(debugFormat.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
def debug(event: Debug): Unit = event match {
case e: Debug3 // has marker
println(DebugWithMarkerFormat.format(
e.marker.name,
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
case _
println(DebugFormat.format(
timestamp(event),
event.thread.getName,
event.logSource,
event.message))
}
}
object StdOutLogger {
// format: OFF
private final val ErrorFormat = "[ERROR] [%s] [%s] [%s] %s%s"
private final val ErrorFormatWithMarker = "[ERROR] [%s][%s] [%s] [%s] %s%s"
private final val ErrorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s"
private final val ErrorWithoutCauseWithMarkerFormat = "[ERROR] [%s][%s] [%s] [%s] %s"
private final val WarningFormat = "[WARN] [%s] [%s] [%s] %s"
private final val WarningWithMarkerFormat = "[WARN] [%s][%s] [%s] [%s] %s"
private final val InfoFormat = "[INFO] [%s] [%s] [%s] %s"
private final val InfoWithMarkerFormat = "[INFO] [%s][%s] [%s] [%s] %s"
private final val DebugFormat = "[DEBUG] [%s] [%s] [%s] %s"
private final val DebugWithMarkerFormat = "[DEBUG] [%s][%s] [%s] [%s] %s"
// format: ON
}
/**
@ -1228,6 +1383,249 @@ trait DiagnosticLoggingAdapter extends LoggingAdapter {
def clearMDC(): Unit = mdc(emptyMDC)
}
final class LogMarker(val name: String)
object LogMarker {
/** The Marker is internally transferred via MDC using using this key */
private[akka] final val MDCKey = "marker"
def apply(name: String): LogMarker = new LogMarker(name)
/** Java API */
def create(name: String): LogMarker = apply(name)
def extractFromMDC(mdc: MDC): Option[String] =
mdc.get(MDCKey) match {
case Some(v) Some(v.toString)
case None None
}
}
/**
* [[LoggingAdapter]] extension which adds Marker support.
* Only recommended to be used within Actors as it isn't thread safe.
*/
class MarkerLoggingAdapter(
override val bus: LoggingBus,
override val logSource: String,
override val logClass: Class[_],
loggingFilter: LoggingFilter)
extends BusLogging(bus, logSource, logClass, loggingFilter) {
// TODO when breaking binary compatibility, these marker methods should become baked into LoggingAdapter itself
// For backwards compatibility, and when LoggingAdapter is created without direct
// association to an ActorSystem
def this(bus: LoggingBus, logSource: String, logClass: Class[_]) =
this(bus, logSource, logClass, new DefaultLoggingFilter(() bus.logLevel))
/**
* Log message at error level, including the exception that caused the error.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, cause: Throwable, message: String): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, message, mdc, marker))
/**
* Message template with 1 replacement argument.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1), mdc, marker))
/**
* Message template with 2 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any, arg2: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2), mdc, marker))
/**
* Message template with 3 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker))
/**
* Message template with 4 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker))
/**
* Log message at error level, without providing the exception that caused the error.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, message: String): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, message, mdc, marker))
/**
* Message template with 1 replacement argument.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, template: String, arg1: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1), mdc, marker))
/**
* Message template with 2 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2), mdc, marker))
/**
* Message template with 3 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker))
/**
* Message template with 4 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def error(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker))
/**
* Log message at warning level.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def warning(marker: LogMarker, message: String): Unit =
if (isErrorEnabled) bus.publish(Error(logSource, logClass, message, mdc, marker))
/**
* Message template with 1 replacement argument.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def warning(marker: LogMarker, template: String, arg1: Any): Unit =
if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1), mdc, marker))
/**
* Message template with 2 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def warning(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit =
if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1, arg2), mdc, marker))
/**
* Message template with 3 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def warning(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker))
/**
* Message template with 4 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def warning(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit =
if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker))
/**
* Log message at info level.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def info(marker: LogMarker, message: String): Unit =
if (isInfoEnabled) bus.publish(Info(logSource, logClass, message, mdc, marker))
/**
* Message template with 1 replacement argument.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def info(marker: LogMarker, template: String, arg1: Any): Unit =
if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1), mdc, marker))
/**
* Message template with 2 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def info(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit =
if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1, arg2), mdc, marker))
/**
* Message template with 3 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def info(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker))
/**
* Message template with 4 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def info(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit =
if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker))
/**
* Log message at debug level.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def debug(marker: LogMarker, message: String): Unit =
if (isDebugEnabled) bus.publish(Debug(logSource, logClass, message, mdc, marker))
/**
* Message template with 1 replacement argument.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def debug(marker: LogMarker, template: String, arg1: Any): Unit =
if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1), mdc, marker))
/**
* Message template with 2 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def debug(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit =
if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1, arg2), mdc, marker))
/**
* Message template with 3 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def debug(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker))
/**
* Message template with 4 replacement arguments.
* The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special".
* @see [[LoggingAdapter]]
*/
def debug(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit =
if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker))
}
final class DiagnosticMarkerBusLoggingAdapter(
override val bus: LoggingBus,
override val logSource: String,
override val logClass: Class[_],
loggingFilter: LoggingFilter)
extends MarkerLoggingAdapter(bus, logSource, logClass, loggingFilter) with DiagnosticLoggingAdapter
/**
* [[akka.event.LoggingAdapter]] that publishes [[akka.event.Logging.LogEvent]] to event stream.
*/
@ -1246,11 +1644,16 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class
def isInfoEnabled = loggingFilter.isInfoEnabled(logClass, logSource)
def isDebugEnabled = loggingFilter.isDebugEnabled(logClass, logSource)
protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, message, mdc))
protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, message, mdc))
protected def notifyWarning(message: String): Unit = bus.publish(Warning(logSource, logClass, message, mdc))
protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message, mdc))
protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message, mdc))
protected def notifyError(message: String): Unit =
bus.publish(Error(logSource, logClass, message, mdc))
protected def notifyError(cause: Throwable, message: String): Unit =
bus.publish(Error(cause, logSource, logClass, message, mdc))
protected def notifyWarning(message: String): Unit =
bus.publish(Warning(logSource, logClass, message, mdc))
protected def notifyInfo(message: String): Unit =
bus.publish(Info(logSource, logClass, message, mdc))
protected def notifyDebug(message: String): Unit =
bus.publish(Debug(logSource, logClass, message, mdc))
}
/**

View file

@ -8,13 +8,13 @@ import akka.camel.TestSupport.SharedCamelSystem
import internal.DefaultCamel
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
import org.apache.camel.{ ProducerTemplate }
import org.apache.camel.ProducerTemplate
import org.scalatest.WordSpec
import akka.event.LoggingAdapter
import akka.event.{ LoggingAdapter, MarkerLoggingAdapter }
import akka.actor.ActorSystem.Settings
import com.typesafe.config.ConfigFactory
import org.apache.camel.impl.DefaultCamelContext
import akka.actor.{ ExtendedActorSystem }
import akka.actor.ExtendedActorSystem
class DefaultCamelTest extends WordSpec with SharedCamelSystem with Matchers with MockitoSugar {
@ -26,7 +26,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with Matchers wit
when(sys.name) thenReturn ("mocksystem")
def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override val log = mock[MarkerLoggingAdapter]
override lazy val template = mock[ProducerTemplate]
override lazy val context = mock[DefaultCamelContext]
override val settings = mock[CamelSettings]

View file

@ -8,23 +8,27 @@ import language.postfixOps
import org.scalatest.mock.MockitoSugar
import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.apache.camel.{ ProducerTemplate, AsyncCallback }
import org.apache.camel.{ AsyncCallback, ProducerTemplate }
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration._
import java.lang.String
import akka.camel._
import internal.{ DefaultCamel, CamelExchangeAdapter }
import org.scalatest.{ Suite, WordSpecLike, BeforeAndAfterAll, BeforeAndAfterEach }
import internal.{ CamelExchangeAdapter, DefaultCamel }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Suite, WordSpecLike }
import akka.camel.TestSupport._
import java.util.concurrent.{ TimeoutException, CountDownLatch }
import org.mockito.{ ArgumentMatcher, Matchers MMatchers, Mockito }
import java.util.concurrent.{ CountDownLatch, TimeoutException }
import org.mockito.{ ArgumentMatcher, Mockito, Matchers MMatchers }
import org.scalatest.Matchers
import akka.actor.Status.{ Failure }
import akka.actor.Status.Failure
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem.Settings
import akka.event.LoggingAdapter
import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe }
import akka.event.{ LoggingAdapter, MarkerLoggingAdapter }
import akka.testkit.{ TestKit, TestLatch, TestProbe, TimingTest }
import org.apache.camel.impl.DefaultCamelContext
import scala.concurrent.{ Await, Future }
import akka.util.Timeout
import akka.actor._
@ -350,7 +354,7 @@ private[camel] trait ActorProducerFixture extends MockitoSugar with BeforeAndAft
when(sys.name) thenReturn ("mocksystem")
def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override val log = mock[MarkerLoggingAdapter]
override lazy val template = mock[ProducerTemplate]
override lazy val context = mock[DefaultCamelContext]
override val settings = new CamelSettings(ConfigFactory.parseString(

View file

@ -70,25 +70,20 @@ trait JavaLoggingAdapter extends LoggingAdapter {
def isDebugEnabled = logger.isLoggable(logging.Level.CONFIG)
protected def notifyError(message: String) {
protected def notifyError(message: String): Unit =
log(logging.Level.SEVERE, null, message)
}
protected def notifyError(cause: Throwable, message: String) {
protected def notifyError(cause: Throwable, message: String): Unit =
log(logging.Level.SEVERE, cause, message)
}
protected def notifyWarning(message: String) {
protected def notifyWarning(message: String): Unit =
log(logging.Level.WARNING, null, message)
}
protected def notifyInfo(message: String) {
protected def notifyInfo(message: String): Unit =
log(logging.Level.INFO, null, message)
}
protected def notifyDebug(message: String) {
protected def notifyDebug(message: String): Unit =
log(logging.Level.CONFIG, null, message)
}
@inline
def log(level: logging.Level, cause: Throwable, message: String) {

View file

@ -434,3 +434,24 @@ Now, the values will be available in the MDC, so you can use them in the layout
</encoder>
</appender>
Using Markers
-------------
Some logging libraries allow, in addition to MDC data, attaching so called "markers" to log statements.
These are used to filter out rare and special events, for example you might want to mark logs that detect
some malicious activity and mark them with a ``SECURITY`` tag, and in your appender configuration make these
trigger emails and other notifications immediately.
Markers are available through the LoggingAdapters, when obtained via ``Logging.withMarker``.
The first argument passed into all log calls then should be a ``akka.event.LogMarker``.
The slf4j bridge provided by akka in ``akka-slf4j`` will automatically pick up this marker value and make it available to SLF4J.
For example you could use it like this::
<pattern>%date{ISO8601} [%marker][%level] [%msg]%n</pattern>
A more advanced (including most Akka added information) example pattern would be::
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>

View file

@ -446,7 +446,7 @@ get it you will use the factory receiving an Actor as logSource:
val log: DiagnosticLoggingAdapter = Logging(this);
Once you have the logger, you just need to add the custom values before you log something.
This way, the values will be put in the SLF4J MDC right before appending the log and removed after.
This way, the values will dologbe put in the SLF4J MDC right before appending the log and removed after.
.. note::
@ -473,3 +473,24 @@ Now, the values will be available in the MDC, so you can use them in the layout
</encoder>
</appender>
Using Markers
-------------
Some logging libraries allow, in addition to MDC data, attaching so called "markers" to log statements.
These are used to filter out rare and special events, for example you might want to mark logs that detect
some malicious activity and mark them with a ``SECURITY`` tag, and in your appender configuration make these
trigger emails and other notifications immediately.
Markers are available through the LoggingAdapters, when obtained via ``Logging.withMarker``.
The first argument passed into all log calls then should be a ``akka.event.LogMarker``.
The slf4j bridge provided by akka in ``akka-slf4j`` will automatically pick up this marker value and make it available to SLF4J.
For example you could use it like this::
<pattern>%date{ISO8601} [%marker][%level] [%msg]%n</pattern>
A more advanced (including most Akka added information) example pattern would be::
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>

View file

@ -0,0 +1,90 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl
import java.util.{ Optional, Collection JCollection }
import javax.net.ssl.{ SSLContext, SSLParameters }
import akka.http.scaladsl
import akka.japi.Util
import akka.stream.TLSClientAuth
import akka.http.impl.util.JavaMapping.Implicits._
import com.lightbend.sslconfig.akka.AkkaSSLConfig
import scala.compat.java8.OptionConverters
import scala.collection.JavaConverters._
object ConnectionContext {
//#https-context-creation
// ConnectionContext
/** Used to serve HTTPS traffic. */
def https(sslContext: SSLContext): HttpsConnectionContext =
scaladsl.ConnectionContext.https(sslContext)
/** Used to serve HTTPS traffic. */
def https(
sslContext: SSLContext,
sslConfig: Optional[AkkaSSLConfig],
enabledCipherSuites: Optional[JCollection[String]],
enabledProtocols: Optional[JCollection[String]],
clientAuth: Optional[TLSClientAuth],
sslParameters: Optional[SSLParameters]) =
scaladsl.ConnectionContext.https(
sslContext,
OptionConverters.toScala(sslConfig),
OptionConverters.toScala(enabledCipherSuites).map(Util.immutableSeq(_)),
OptionConverters.toScala(enabledProtocols).map(Util.immutableSeq(_)),
OptionConverters.toScala(clientAuth),
OptionConverters.toScala(sslParameters))
//#https-context-creation
/** Used to serve HTTPS traffic. */
// for binary-compatibility, since 2.4.7
def https(
sslContext: SSLContext,
enabledCipherSuites: Optional[JCollection[String]],
enabledProtocols: Optional[JCollection[String]],
clientAuth: Optional[TLSClientAuth],
sslParameters: Optional[SSLParameters]) =
scaladsl.ConnectionContext.https(
sslContext,
OptionConverters.toScala(enabledCipherSuites).map(Util.immutableSeq(_)),
OptionConverters.toScala(enabledProtocols).map(Util.immutableSeq(_)),
OptionConverters.toScala(clientAuth),
OptionConverters.toScala(sslParameters))
/** Used to serve HTTP traffic. */
def noEncryption(): HttpConnectionContext =
scaladsl.ConnectionContext.noEncryption()
}
abstract class ConnectionContext {
def isSecure: Boolean
/** Java API */
def getDefaultPort: Int
def sslConfig: Option[AkkaSSLConfig]
}
abstract class HttpConnectionContext extends akka.http.javadsl.ConnectionContext {
override final def isSecure = false
override final def getDefaultPort = 80
override def sslConfig: Option[AkkaSSLConfig] = None
}
abstract class HttpsConnectionContext extends akka.http.javadsl.ConnectionContext {
override final def isSecure = true
override final def getDefaultPort = 443
/** Java API */
def getEnabledCipherSuites: Optional[JCollection[String]]
/** Java API */
def getEnabledProtocols: Optional[JCollection[String]]
/** Java API */
def getClientAuth: Optional[TLSClientAuth]
/** Java API */
def getSslContext: SSLContext
/** Java API */
def getSslParameters: Optional[SSLParameters]
}

View file

@ -0,0 +1,751 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl
import java.net.InetSocketAddress
import java.util.Optional
import akka.http.impl.util.JavaMapping
import akka.http.impl.util.JavaMapping.HttpsConnectionContext
import akka.http.javadsl.model.ws._
import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.{ NotUsed, stream }
import akka.stream.TLSProtocol._
import scala.concurrent.Future
import scala.util.Try
import akka.stream.scaladsl.Keep
import akka.japi.{ Function, Pair }
import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
import akka.event.LoggingAdapter
import akka.stream.Materializer
import akka.stream.javadsl.{ BidiFlow, Flow, Source }
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.scaladsl.{ model sm }
import akka.http.javadsl.model._
import akka.http._
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage
import com.lightbend.sslconfig.akka.AkkaSSLConfig
object Http extends ExtensionId[Http] with ExtensionIdProvider {
override def get(system: ActorSystem): Http = super.get(system)
def lookup() = Http
def createExtension(system: ExtendedActorSystem): Http = new Http(system)
}
class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ec }
import language.implicitConversions
private implicit def completionStageCovariant[T, U >: T](in: CompletionStage[T]): CompletionStage[U] = in.asInstanceOf[CompletionStage[U]]
private implicit def javaModelIsScalaModel[J <: AnyRef, S <: J](in: Future[J])(implicit ev: JavaMapping.Inherited[J, S]): Future[S] = in.asInstanceOf[Future[S]]
private lazy val delegate = akka.http.scaladsl.Http(system)
/**
* Constructs a server layer stage using the configured default [[akka.http.javadsl.settings.ServerSettings]]. The returned [[BidiFlow]] isn't
* reusable and can only be materialized once.
*/
def serverLayer(materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer()(materializer))
/**
* Constructs a server layer stage using the given [[akka.http.javadsl.settings.ServerSettings]]. The returned [[BidiFlow]] isn't reusable and
* can only be materialized once.
*/
def serverLayer(
settings: ServerSettings,
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer(settings.asScala)(materializer))
/**
* Constructs a server layer stage using the given [[akka.http.javadsl.settings.ServerSettings]]. The returned [[BidiFlow]] isn't reusable and
* can only be materialized once. The `remoteAddress`, if provided, will be added as a header to each [[HttpRequest]]
* this layer produces if the `akka.http.server.remote-address-header` configuration option is enabled.
*/
def serverLayer(
settings: ServerSettings,
remoteAddress: Optional[InetSocketAddress],
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala)(materializer))
/**
* Constructs a server layer stage using the given [[ServerSettings]]. The returned [[BidiFlow]] isn't reusable and
* can only be materialized once. The remoteAddress, if provided, will be added as a header to each [[HttpRequest]]
* this layer produces if the `akka.http.server.remote-address-header` configuration option is enabled.
*/
def serverLayer(
settings: ServerSettings,
remoteAddress: Optional[InetSocketAddress],
log: LoggingAdapter,
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala, log)(materializer))
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(connect: ConnectHttp, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(
connect: ConnectHttp,
settings: ServerSettings,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, settings = settings.asScala, connectionContext = connectionContext)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, _],
connect: ConnectHttp,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandle(
handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
connect.host, connect.port, connectionContext)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, _],
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandle(
handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleSync(
handler: Function[HttpRequest, HttpResponse],
connect: ConnectHttp,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleSync(handler.apply(_).asScala, connect.host, connect.port, connectionContext)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleSync(
handler: Function[HttpRequest, HttpResponse],
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleSync(
handler.apply(_).asScala,
connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleAsync(
handler: Function[HttpRequest, CompletionStage[HttpResponse]],
connect: ConnectHttp,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleAsync(handler.apply(_).toScala, connect.host, connect.port, connectionContext)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleAsync(
handler: Function[HttpRequest, CompletionStage[HttpResponse]],
connect: ConnectHttp,
settings: ServerSettings,
parallelism: Int, log: LoggingAdapter,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleAsync(
handler.apply(_).toScala,
connect.host, connect.port, connectionContext, settings.asScala, parallelism, log)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Constructs a client layer stage using the configured default [[akka.http.javadsl.settings.ClientConnectionSettings]].
*/
def clientLayer(hostHeader: headers.Host): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] =
adaptClientLayer(delegate.clientLayer(JavaMapping.toScala(hostHeader)))
/**
* Constructs a client layer stage using the given [[akka.http.javadsl.settings.ClientConnectionSettings]].
*/
def clientLayer(
hostHeader: headers.Host,
settings: ClientConnectionSettings): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] =
adaptClientLayer(delegate.clientLayer(JavaMapping.toScala(hostHeader), settings.asScala))
/**
* Constructs a client layer stage using the given [[ClientConnectionSettings]].
*/
def clientLayer(
hostHeader: headers.Host,
settings: ClientConnectionSettings,
log: LoggingAdapter): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] =
adaptClientLayer(delegate.clientLayer(JavaMapping.toScala(hostHeader), settings.asScala, log))
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* If the hostname is given with an `https://` prefix, the default [[HttpsConnectionContext]] will be used.
*/
def outgoingConnection(host: String): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] =
outgoingConnection(ConnectHttp.toHost(host))
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*
* Use the [[ConnectHttp]] DSL to configure target host and whether HTTPS should be used.
*/
def outgoingConnection(to: ConnectHttp): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] =
adaptOutgoingFlow {
if (to.isHttps) delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala)
else delegate.outgoingConnection(to.host, to.port)
}
/**
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*/
def outgoingConnection(
to: ConnectHttp,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] =
adaptOutgoingFlow {
if (to.isHttps)
delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asInstanceOf[HttpsConnectionContext].asScala, localAddress.asScala, settings.asScala, log)
else
delegate.outgoingConnection(to.host, to.port, localAddress.asScala, settings.asScala, log)
}
/**
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
* the requests from all its materializations across this pool.
* While the started host connection pool internally shuts itself down automatically after the configured idle
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def newHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
newHostConnectionPool[T](ConnectHttp.toHost(host), materializer)
/**
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
* the requests from all its materializations across this pool.
* While the started host connection pool internally shuts itself down automatically after the configured idle
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def newHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](to.host, to.port)(materializer).mapMaterializedValue(_.toJava))
/**
* Same as [[newHostConnectionPool]] but with HTTPS encryption.
*
* The given [[ConnectionContext]] will be used for encryption on the connection.
*/
def newHostConnectionPool[T](
to: ConnectHttp,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow {
to.effectiveHttpsConnectionContext(defaultClientHttpsContext) match {
case https: HttpsConnectionContext
delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
case _
delegate.newHostConnectionPool[T](to.host, to.port, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
}
}
/**
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
* configuration a separate connection pool is maintained.
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
*
* The internal caching logic guarantees that there will never be more than a single pool running for the
* given target host endpoint and configuration (in this ActorSystem).
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
cachedHostConnectionPool(ConnectHttp.toHost(host), materializer)
/**
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
* configuration a separate connection pool is maintained.
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
*
* The internal caching logic guarantees that there will never be more than a single pool running for the
* given target host endpoint and configuration (in this ActorSystem).
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
* response for A.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](to.host, to.port)(materializer).mapMaterializedValue(_.toJava))
/**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
*
* The given [[ConnectionContext]] will be used for encryption on the connection.
*/
def cachedHostConnectionPool[T](
to: ConnectHttp,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolHttps[T](to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava))
/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
* depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or
* a valid `Host` header.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests `A` and `B` enter the flow in that order the response for `B` might be produced before the
* response for `A`.
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T]()(materializer))
/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
* depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or
* a valid `Host` header.
*
* The given [[HttpsConnectionContext]] is used to configure TLS for the connection.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the
* response for `A`.
*
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](
settings: ConnectionPoolSettings,
connectionContext: HttpsConnectionContext,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T](connectionContext.asScala, settings.asScala, log)(materializer))
/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
* depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or
* a valid `Host` header.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
* responses in an order that doesn't directly match the consumed requests.
* For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the
* response for `A`.
*
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T](defaultClientHttpsContext.asScala, settings.asScala, log)(materializer))
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
* effective URI to produce a response future.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
* the future will be completed with an error.
*/
def singleRequest(request: HttpRequest, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequest(request.asScala)(materializer).toJava
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
* effective URI to produce a response future.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
* the future will be completed with an error.
*/
def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequest(request.asScala, connectionContext.asScala)(materializer).toJava
/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
* effective URI to produce a response future.
*
* The given [[HttpsConnectionContext]] will be used for encruption if the request is sent to an https endpoint.
*
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
* the future will be completed with an error.
*/
def singleRequest(
request: HttpRequest,
connectionContext: HttpsConnectionContext,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequest(request.asScala, connectionContext.asScala, settings.asScala, log)(materializer).toJava
/**
* Constructs a WebSocket [[BidiFlow]].
*
* The layer is not reusable and must only be materialized once.
*/
def webSocketClientLayer(request: WebSocketRequest): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] =
adaptWsBidiFlow(delegate.webSocketClientLayer(request.asScala))
/**
* Constructs a WebSocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]],
* configured using the `akka.http.client` config section.
*
* The layer is not reusable and must only be materialized once.
*/
def webSocketClientLayer(
request: WebSocketRequest,
settings: ClientConnectionSettings): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] =
adaptWsBidiFlow(delegate.webSocketClientLayer(request.asScala, settings.asScala))
/**
* Constructs a WebSocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]],
* configured using the `akka.http.client` config section.
*
* The layer is not reusable and must only be materialized once.
*/
def webSocketClientLayer(
request: WebSocketRequest,
settings: ClientConnectionSettings,
log: LoggingAdapter): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] =
adaptWsBidiFlow(delegate.webSocketClientLayer(request.asScala, settings.asScala, log))
/**
* Constructs a flow that once materialized establishes a WebSocket connection to the given Uri.
*
* The layer is not reusable and must only be materialized once.
*/
def webSocketClientFlow(request: WebSocketRequest): Flow[Message, Message, CompletionStage[WebSocketUpgradeResponse]] =
adaptWsFlow {
delegate.webSocketClientFlow(request.asScala)
}
/**
* Constructs a flow that once materialized establishes a WebSocket connection to the given Uri.
*
* The layer is not reusable and must only be materialized once.
*/
def webSocketClientFlow(
request: WebSocketRequest,
connectionContext: ConnectionContext,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[Message, Message, CompletionStage[WebSocketUpgradeResponse]] =
adaptWsFlow {
delegate.webSocketClientFlow(request.asScala, connectionContext.asScala, localAddress.asScala, settings.asScala, log)
}
/**
* Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the
* WebSocket conversation.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def singleWebSocketRequest[T](
request: WebSocketRequest,
clientFlow: Flow[Message, Message, T],
materializer: Materializer): Pair[CompletionStage[WebSocketUpgradeResponse], T] =
adaptWsResultTuple {
delegate.singleWebSocketRequest(
request.asScala,
adaptWsFlow[T](clientFlow))(materializer)
}
/**
* Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the
* WebSocket conversation.
*
* The [[defaultClientHttpsContext]] is used to configure TLS for the connection.
*/
def singleWebSocketRequest[T](
request: WebSocketRequest,
clientFlow: Flow[Message, Message, T],
connectionContext: ConnectionContext,
materializer: Materializer): Pair[CompletionStage[WebSocketUpgradeResponse], T] =
adaptWsResultTuple {
delegate.singleWebSocketRequest(
request.asScala,
adaptWsFlow[T](clientFlow),
connectionContext.asScala)(materializer)
}
/**
* Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the
* WebSocket conversation.
*/
def singleWebSocketRequest[T](
request: WebSocketRequest,
clientFlow: Flow[Message, Message, T],
connectionContext: ConnectionContext,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter,
materializer: Materializer): Pair[CompletionStage[WebSocketUpgradeResponse], T] =
adaptWsResultTuple {
delegate.singleWebSocketRequest(
request.asScala,
adaptWsFlow[T](clientFlow),
connectionContext.asScala,
localAddress.asScala,
settings.asScala,
log)(materializer)
}
/**
* Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]].
* The returned future is completed when all pools that were live at the time of this method call
* have completed their shutdown process.
*
* If existing pool client flows are re-used or new ones materialized concurrently with or after this
* method call the respective connection pools will be restarted and not contribute to the returned future.
*/
def shutdownAllConnectionPools(): CompletionStage[Unit] = delegate.shutdownAllConnectionPools().toJava
/**
* Gets the current default server-side [[ConnectionContext]] defaults to plain HTTP.
* Can be modified using [[setDefaultServerHttpContext]], and will then apply for servers bound after that call has completed.
*/
def defaultServerHttpContext: ConnectionContext =
delegate.defaultServerHttpContext
/**
* Sets the default server-side [[ConnectionContext]].
* If it is an instance of [[HttpsConnectionContext]] then the server will be bound using HTTPS.
*/
def setDefaultServerHttpContext(context: ConnectionContext): Unit =
delegate.setDefaultServerHttpContext(context.asScala)
/**
* Gets the current default client-side [[ConnectionContext]].
*/
def defaultClientHttpsContext: akka.http.javadsl.HttpsConnectionContext = delegate.defaultClientHttpsContext
/**
* Sets the default client-side [[ConnectionContext]].
*/
def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit =
delegate.setDefaultClientHttpsContext(context.asScala)
def createServerHttpsContext(sslConfig: AkkaSSLConfig): HttpsConnectionContext =
delegate.createServerHttpsContext(sslConfig)
def createClientHttpsContext(sslConfig: AkkaSSLConfig): HttpsConnectionContext =
delegate.createClientHttpsContext(sslConfig)
def createDefaultClientHttpsContext(): HttpsConnectionContext =
delegate.createDefaultClientHttpsContext()
private def adaptTupleFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = {
implicit val _ = JavaMapping.identity[T]
JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat])
}
private def adaptOutgoingFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[scaladsl.model.HttpRequest, scaladsl.model.HttpResponse, Future[scaladsl.Http.OutgoingConnection]]): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] =
Flow.fromGraph {
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
.viaMat(scalaFlow)(Keep.right)
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)
}
private def adaptServerLayer(serverLayer: scaladsl.Http.ServerLayer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
new BidiFlow(
JavaMapping.adapterBidiFlow[HttpResponse, sm.HttpResponse, sm.HttpRequest, HttpRequest]
.atop(serverLayer))
private def adaptClientLayer(clientLayer: scaladsl.Http.ClientLayer): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] =
new BidiFlow(
JavaMapping.adapterBidiFlow[HttpRequest, sm.HttpRequest, sm.HttpResponse, HttpResponse]
.atop(clientLayer))
private def adaptWsBidiFlow(wsLayer: scaladsl.Http.WebSocketClientLayer): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] =
new BidiFlow(
JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message]
.atopMat(wsLayer)((_, s) adaptWsUpgradeResponse(s)))
private def adaptWsFlow(wsLayer: stream.scaladsl.Flow[sm.ws.Message, sm.ws.Message, Future[scaladsl.model.ws.WebSocketUpgradeResponse]]): Flow[Message, Message, CompletionStage[WebSocketUpgradeResponse]] =
Flow.fromGraph(JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message].joinMat(wsLayer)(Keep.right).mapMaterializedValue(adaptWsUpgradeResponse _))
private def adaptWsFlow[Mat](javaFlow: Flow[Message, Message, Mat]): stream.scaladsl.Flow[scaladsl.model.ws.Message, scaladsl.model.ws.Message, Mat] =
stream.scaladsl.Flow[scaladsl.model.ws.Message]
.map(Message.adapt)
.viaMat(javaFlow.asScala)(Keep.right)
.map(_.asScala)
private def adaptWsResultTuple[T](result: (Future[scaladsl.model.ws.WebSocketUpgradeResponse], T)): Pair[CompletionStage[WebSocketUpgradeResponse], T] =
result match {
case (fut, tMat) Pair(adaptWsUpgradeResponse(fut), tMat)
}
private def adaptWsUpgradeResponse(responseFuture: Future[scaladsl.model.ws.WebSocketUpgradeResponse]): CompletionStage[WebSocketUpgradeResponse] =
responseFuture.map(WebSocketUpgradeResponse.adapt)(system.dispatcher).toJava
}

View file

@ -4,8 +4,7 @@
package akka.event.slf4j
import org.slf4j.{ Logger SLFLogger, LoggerFactory SLFLoggerFactory }
import org.slf4j.MDC
import org.slf4j.{ MDC, Marker, MarkerFactory, Logger SLFLogger, LoggerFactory SLFLoggerFactory }
import akka.event.Logging._
import akka.actor._
import akka.event.DummyClassForStringSources
@ -67,19 +66,27 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg
case event @ Error(cause, logSource, logClass, message)
withMdc(logSource, event) {
cause match {
case Error.NoCause | null Logger(logClass, logSource).error(if (message != null) message.toString else null)
case _ Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause)
case Error.NoCause | null
Logger(logClass, logSource).error(markerIfPresent(event), if (message != null) message.toString else null)
case _
Logger(logClass, logSource).error(markerIfPresent(event), if (message != null) message.toString else cause.getLocalizedMessage, cause)
}
}
case event @ Warning(logSource, logClass, message)
withMdc(logSource, event) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) }
withMdc(logSource, event) {
Logger(logClass, logSource).warn(markerIfPresent(event), "{}", message.asInstanceOf[AnyRef])
}
case event @ Info(logSource, logClass, message)
withMdc(logSource, event) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) }
withMdc(logSource, event) {
Logger(logClass, logSource).info(markerIfPresent(event), "{}", message.asInstanceOf[AnyRef])
}
case event @ Debug(logSource, logClass, message)
withMdc(logSource, event) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) }
withMdc(logSource, event) {
Logger(logClass, logSource).debug(markerIfPresent(event), "{}", message.asInstanceOf[AnyRef])
}
case InitializeLogger(_)
log.info("Slf4jLogger started")
@ -102,6 +109,12 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg
}
}
private final def markerIfPresent(event: LogEvent): Marker =
event match {
case m: LogEventWithMarker MarkerFactory.getMarker(m.marker.name)
case _ null
}
/**
* Override this method to provide a differently formatted timestamp
* @param timestamp a "currentTimeMillis"-obtained timestamp

View file

@ -7,7 +7,7 @@
</appender>
<appender name="TEST" class="akka.event.slf4j.Slf4jLoggerSpec$TestAppender">
<encoder>
<pattern>%date{ISO8601} level=[%level] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>
</encoder>
</appender>
<logger name="akka.event.slf4j.Slf4jLoggingFilterSpec$DebugLevelProducer"
@ -25,4 +25,4 @@
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
</configuration>

View file

@ -4,13 +4,14 @@
package akka.event.slf4j
import language.postfixOps
import akka.testkit.AkkaSpec
import akka.actor.{ DiagnosticActorLogging, Actor, Props }
import akka.actor.{ Actor, DiagnosticActorLogging, Props }
import scala.concurrent.duration._
import akka.event.Logging
import akka.event.{ LogMarker, Logging }
import ch.qos.logback.core.OutputStreamAppender
import java.io.ByteArrayOutputStream
import org.scalatest.BeforeAndAfterEach
object Slf4jLoggerSpec {
@ -25,9 +26,12 @@ object Slf4jLoggerSpec {
}
"""
case class StringWithMDC(s: String, mdc: Map[String, Any])
final case class StringWithMDC(s: String, mdc: Map[String, Any])
final case class StringWithMarker(s: String, marker: LogMarker)
class LogProducer extends Actor with DiagnosticActorLogging {
final class LogProducer extends Actor with DiagnosticActorLogging {
val markLog = Logging.withMarker(this)
def receive = {
case e: Exception
@ -38,6 +42,8 @@ object Slf4jLoggerSpec {
log.mdc(mdc)
log.info(s)
log.clearMDC()
case StringWithMarker(s, marker)
markLog.info(marker, s)
}
}
@ -95,6 +101,15 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
s should include("msg=[test x=3 y=17]")
}
"log info with marker" in {
producer ! StringWithMarker("security-wise interesting message", LogMarker("SECURITY"))
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s should include("marker=[SECURITY]")
s should include("msg=[security-wise interesting message]")
}
"put custom MDC values when specified" in {
producer ! StringWithMDC("Message with custom MDC values", Map("ticketNumber" 3671, "ticketDesc" "Custom MDC Values"))

View file

@ -0,0 +1,15 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package com.lightbend.sslconfig.akka
import javax.net.ssl.{ SSLContext, SSLEngine }
/**
* Gives the chance to configure the SSLContext before it is going to be used.
* The passed in context will be already set in client mode and provided with hostInfo during initialization.
*/
trait SSLEngineConfigurator {
def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine
}

View file

@ -0,0 +1,12 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package com.lightbend.sslconfig.akka.util
/** INTERNAL API */
final class AkkaLoggerFactory(system: ActorSystem) extends LoggerFactory {
override def apply(clazz: Class[_]): NoDepsLogger = new AkkaLoggerBridge(system.eventStream, clazz)
override def apply(name: String): NoDepsLogger = new AkkaLoggerBridge(system.eventStream, name, classOf[DummyClassForStringSources])
}