diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 269b14ebb9..ea3292bb3e 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -5,11 +5,13 @@ package akka.event import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger +import java.util.logging import akka.actor.ActorSystem.Settings import akka.actor._ import akka.dispatch.RequiresMessageQueue -import akka.util.ReentrantGuard +import akka.event.Logging.{ Extension ⇒ _, _ } +import akka.util.{ OptionVal, ReentrantGuard } import akka.util.Helpers.toRootLowerCase import akka.{ AkkaException, ConfigurationException } @@ -379,14 +381,12 @@ object Logging { /** * Returns a 'safe' getSimpleName for the provided object's Class - * @param obj * @return the simple name of the given object's Class */ def simpleName(obj: AnyRef): String = simpleName(obj.getClass) /** * Returns a 'safe' getSimpleName for the provided Class - * @param clazz * @return the simple name of the given Class */ def simpleName(clazz: Class[_]): String = { @@ -511,6 +511,30 @@ object Logging { val (str, clazz) = LogSource(logSource, system) new BusLogging(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter) } + /** + * Obtain LoggingAdapter with additional "marker" support (which some logging frameworks are able to utilise) + * for the given actor system and source object. This will use the system’s event stream and include the system’s + * address in the log source string. + * + * Do not use this if you want to supply a log category string (like + * “com.example.app.whatever”) unaltered, supply `system.eventStream` in this + * case or use + * + * {{{ + * Logging(system, this.getClass) + * }}} + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. + * + * You can add your own rules quite easily, see [[akka.event.LogSource]]. + */ + def withMarker[T: LogSource](system: ActorSystem, logSource: T): MarkerLoggingAdapter = { + val (str, clazz) = LogSource(logSource, system) + new MarkerLoggingAdapter(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter) + } /** * Obtain LoggingAdapter for the given logging bus and source object. @@ -530,6 +554,24 @@ object Logging { val (str, clazz) = LogSource(logSource) new BusLogging(bus, str, clazz) } + /** + * Obtain LoggingAdapter for the given logging bus and source object. + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. + * + * You can add your own rules quite easily, see [[akka.event.LogSource]]. + * + * Note that this `LoggingAdapter` will use the [[akka.event.DefaultLoggingFilter]], + * and not the [[akka.event.LoggingFilter]] configured for the system + * (if different from `DefaultLoggingFilter`). + */ + def withMarker[T: LogSource](bus: LoggingBus, logSource: T): MarkerLoggingAdapter = { + val (str, clazz) = LogSource(logSource) + new MarkerLoggingAdapter(bus, str, clazz) + } /** * Obtain LoggingAdapter with MDC support for the given actor. @@ -540,6 +582,15 @@ object Logging { val system = logSource.context.system.asInstanceOf[ExtendedActorSystem] new BusLogging(system.eventStream, str, clazz, system.logFilter) with DiagnosticLoggingAdapter } + /** + * Obtain LoggingAdapter with marker and MDC support for the given actor. + * Don't use it outside its specific Actor as it isn't thread safe + */ + def withMarker(logSource: Actor): DiagnosticMarkerBusLoggingAdapter = { + val (str, clazz) = LogSource(logSource) + val system = logSource.context.system.asInstanceOf[ExtendedActorSystem] + new DiagnosticMarkerBusLoggingAdapter(system.eventStream, str, clazz, system.logFilter) + } /** * Obtain LoggingAdapter for the given actor system and source object. This @@ -674,6 +725,14 @@ object Logging { case InfoLevel ⇒ Info(logSource, logClass, message, mdc) case DebugLevel ⇒ Debug(logSource, logClass, message, mdc) } + + def apply(level: LogLevel, logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker): LogEvent = level match { + case ErrorLevel ⇒ Error(logSource, logClass, message, mdc, marker) + case WarningLevel ⇒ Warning(logSource, logClass, message, mdc, marker) + case InfoLevel ⇒ Info(logSource, logClass, message, mdc, marker) + case DebugLevel ⇒ Debug(logSource, logClass, message, mdc, marker) + } + } /** @@ -686,11 +745,26 @@ object Logging { class Error2(cause: Throwable, logSource: String, logClass: Class[_], message: Any = "", override val mdc: MDC) extends Error(cause, logSource, logClass, message) { def this(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = this(Error.NoCause, logSource, logClass, message, mdc) } + class Error3(cause: Throwable, logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker) + extends Error2(cause, logSource, logClass, message, mdc) with LogEventWithMarker { + def this(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = this(Error.NoCause, logSource, logClass, message, mdc, marker) + } object Error { - def apply(logSource: String, logClass: Class[_], message: Any) = new Error(NoCause, logSource, logClass, message) - def apply(cause: Throwable, logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Error2(cause, logSource, logClass, message, mdc) - def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Error2(NoCause, logSource, logClass, message, mdc) + def apply(logSource: String, logClass: Class[_], message: Any) = + new Error(NoCause, logSource, logClass, message) + def apply(logSource: String, logClass: Class[_], message: Any, marker: LogMarker) = + new Error3(NoCause, logSource, logClass, message, Map.empty, marker) + + def apply(cause: Throwable, logSource: String, logClass: Class[_], message: Any, mdc: MDC) = + new Error2(cause, logSource, logClass, message, mdc) + def apply(cause: Throwable, logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = + new Error3(cause, logSource, logClass, message, mdc, marker) + + def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = + new Error2(NoCause, logSource, logClass, message, mdc) + def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = + new Error3(NoCause, logSource, logClass, message, mdc, marker) /** Null Object used for errors without cause Throwable */ object NoCause extends NoStackTrace @@ -704,8 +778,11 @@ object Logging { override def level = WarningLevel } class Warning2(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC) extends Warning(logSource, logClass, message) + class Warning3(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker) + extends Warning2(logSource, logClass, message, mdc) with LogEventWithMarker object Warning { def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Warning2(logSource, logClass, message, mdc) + def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = new Warning3(logSource, logClass, message, mdc, marker) } /** @@ -715,8 +792,11 @@ object Logging { override def level = InfoLevel } class Info2(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC) extends Info(logSource, logClass, message) + class Info3(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker) + extends Info2(logSource, logClass, message, mdc) with LogEventWithMarker object Info { def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Info2(logSource, logClass, message, mdc) + def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = new Info3(logSource, logClass, message, mdc, marker) } /** @@ -726,8 +806,21 @@ object Logging { override def level = DebugLevel } class Debug2(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC) extends Debug(logSource, logClass, message) + class Debug3(logSource: String, logClass: Class[_], message: Any, override val mdc: MDC, override val marker: LogMarker) + extends Debug2(logSource, logClass, message, mdc) with LogEventWithMarker object Debug { def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC) = new Debug2(logSource, logClass, message, mdc) + def apply(logSource: String, logClass: Class[_], message: Any, mdc: MDC, marker: LogMarker) = new Debug3(logSource, logClass, message, mdc, marker) + } + + /** INTERNAL API, Marker interface for LogEvents containing Markers, which can be set for example on an slf4j logger */ + sealed trait LogEventWithMarker extends LogEvent { + def marker: LogMarker + /** Appends the marker to the Debug/Info/Warning/Error toString representations */ + override def toString = { + val s = super.toString + s.substring(0, s.length - 1) + "," + marker + ")" + } } /** @@ -766,16 +859,22 @@ object Logging { class LoggerInitializationException(msg: String) extends AkkaException(msg) trait StdOutLogger { + + import StdOutLogger._ import java.text.SimpleDateFormat import java.util.Date private val date = new Date() private val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.SSS") - private val errorFormat = "[ERROR] [%s] [%s] [%s] %s%s" - private val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s" - private val warningFormat = "[WARN] [%s] [%s] [%s] %s" - private val infoFormat = "[INFO] [%s] [%s] [%s] %s" - private val debugFormat = "[DEBUG] [%s] [%s] [%s] %s" + + // format: OFF + // FIXME: remove those when we have the chance to break binary compatibility + private val errorFormat = ErrorFormat + private val errorFormatWithoutCause = ErrorFormatWithoutCause + private val warningFormat = WarningFormat + private val infoFormat = InfoFormat + private val debugFormat = DebugFormat + // format: ON def timestamp(event: LogEvent): String = synchronized { date.setTime(event.timestamp) @@ -790,36 +889,92 @@ object Logging { case e ⇒ warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e)) } - def error(event: Error): Unit = { - val f = if (event.cause == Error.NoCause) errorFormatWithoutCause else errorFormat - println(f.format( - timestamp(event), - event.thread.getName, - event.logSource, - event.message, - stackTraceFor(event.cause))) + def error(event: Error): Unit = event match { + case e: Error3 ⇒ // has marker + val f = if (event.cause == Error.NoCause) ErrorWithoutCauseWithMarkerFormat else ErrorFormatWithMarker + println(f.format( + e.marker.name, + timestamp(event), + event.thread.getName, + event.logSource, + event.message, + stackTraceFor(event.cause))) + case _ ⇒ + val f = if (event.cause == Error.NoCause) ErrorFormatWithoutCause else ErrorFormat + println(f.format( + timestamp(event), + event.thread.getName, + event.logSource, + event.message, + stackTraceFor(event.cause))) } - def warning(event: Warning): Unit = - println(warningFormat.format( - timestamp(event), - event.thread.getName, - event.logSource, - event.message)) + def warning(event: Warning): Unit = event match { + case e: Warning3 ⇒ // has marker + println(WarningWithMarkerFormat.format( + e.marker.name, + timestamp(event), + event.thread.getName, + event.logSource, + event.message)) + case _ ⇒ + println(WarningFormat.format( + timestamp(event), + event.thread.getName, + event.logSource, + event.message)) + } - def info(event: Info): Unit = - println(infoFormat.format( - timestamp(event), - event.thread.getName, - event.logSource, - event.message)) + def info(event: Info): Unit = event match { + case e: Info3 ⇒ // has marker + println(InfoWithMarkerFormat.format( + e.marker.name, + timestamp(event), + event.thread.getName, + event.logSource, + event.message)) + case _ ⇒ + println(InfoFormat.format( + timestamp(event), + event.thread.getName, + event.logSource, + event.message)) + } - def debug(event: Debug): Unit = - println(debugFormat.format( - timestamp(event), - event.thread.getName, - event.logSource, - event.message)) + def debug(event: Debug): Unit = event match { + case e: Debug3 ⇒ // has marker + println(DebugWithMarkerFormat.format( + e.marker.name, + timestamp(event), + event.thread.getName, + event.logSource, + event.message)) + case _ ⇒ + println(DebugFormat.format( + timestamp(event), + event.thread.getName, + event.logSource, + event.message)) + } + } + object StdOutLogger { + // format: OFF + private final val ErrorFormat = "[ERROR] [%s] [%s] [%s] %s%s" + private final val ErrorFormatWithMarker = "[ERROR] [%s][%s] [%s] [%s] %s%s" + + private final val ErrorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s" + private final val ErrorWithoutCauseWithMarkerFormat = "[ERROR] [%s][%s] [%s] [%s] %s" + + private final val WarningFormat = "[WARN] [%s] [%s] [%s] %s" + private final val WarningWithMarkerFormat = "[WARN] [%s][%s] [%s] [%s] %s" + + private final val InfoFormat = "[INFO] [%s] [%s] [%s] %s" + private final val InfoWithMarkerFormat = "[INFO] [%s][%s] [%s] [%s] %s" + + private final val DebugFormat = "[DEBUG] [%s] [%s] [%s] %s" + private final val DebugWithMarkerFormat = "[DEBUG] [%s][%s] [%s] [%s] %s" + + // format: ON } /** @@ -1228,6 +1383,249 @@ trait DiagnosticLoggingAdapter extends LoggingAdapter { def clearMDC(): Unit = mdc(emptyMDC) } +final class LogMarker(val name: String) +object LogMarker { + /** The Marker is internally transferred via MDC using using this key */ + private[akka] final val MDCKey = "marker" + + def apply(name: String): LogMarker = new LogMarker(name) + /** Java API */ + def create(name: String): LogMarker = apply(name) + + def extractFromMDC(mdc: MDC): Option[String] = + mdc.get(MDCKey) match { + case Some(v) ⇒ Some(v.toString) + case None ⇒ None + } + +} + +/** + * [[LoggingAdapter]] extension which adds Marker support. + * Only recommended to be used within Actors as it isn't thread safe. + */ +class MarkerLoggingAdapter( + override val bus: LoggingBus, + override val logSource: String, + override val logClass: Class[_], + loggingFilter: LoggingFilter) + extends BusLogging(bus, logSource, logClass, loggingFilter) { + // TODO when breaking binary compatibility, these marker methods should become baked into LoggingAdapter itself + + // For backwards compatibility, and when LoggingAdapter is created without direct + // association to an ActorSystem + def this(bus: LoggingBus, logSource: String, logClass: Class[_]) = + this(bus, logSource, logClass, new DefaultLoggingFilter(() ⇒ bus.logLevel)) + + /** + * Log message at error level, including the exception that caused the error. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, cause: Throwable, message: String): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, message, mdc, marker)) + + /** + * Message template with 1 replacement argument. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1), mdc, marker)) + + /** + * Message template with 2 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2), mdc, marker)) + + /** + * Message template with 3 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker)) + + /** + * Message template with 4 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker)) + + /** + * Log message at error level, without providing the exception that caused the error. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, message: String): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, message, mdc, marker)) + + /** + * Message template with 1 replacement argument. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, template: String, arg1: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1), mdc, marker)) + + /** + * Message template with 2 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2), mdc, marker)) + + /** + * Message template with 3 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker)) + + /** + * Message template with 4 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def error(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker)) + + /** + * Log message at warning level. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def warning(marker: LogMarker, message: String): Unit = + if (isErrorEnabled) bus.publish(Error(logSource, logClass, message, mdc, marker)) + + /** + * Message template with 1 replacement argument. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def warning(marker: LogMarker, template: String, arg1: Any): Unit = + if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1), mdc, marker)) + + /** + * Message template with 2 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def warning(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit = + if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1, arg2), mdc, marker)) + + /** + * Message template with 3 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def warning(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker)) + + /** + * Message template with 4 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def warning(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = + if (isWarningEnabled) bus.publish(Warning(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker)) + + /** + * Log message at info level. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def info(marker: LogMarker, message: String): Unit = + if (isInfoEnabled) bus.publish(Info(logSource, logClass, message, mdc, marker)) + + /** + * Message template with 1 replacement argument. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def info(marker: LogMarker, template: String, arg1: Any): Unit = + if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1), mdc, marker)) + + /** + * Message template with 2 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def info(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit = + if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1, arg2), mdc, marker)) + + /** + * Message template with 3 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def info(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker)) + + /** + * Message template with 4 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def info(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = + if (isInfoEnabled) bus.publish(Info(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker)) + + /** + * Log message at debug level. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def debug(marker: LogMarker, message: String): Unit = + if (isDebugEnabled) bus.publish(Debug(logSource, logClass, message, mdc, marker)) + + /** + * Message template with 1 replacement argument. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def debug(marker: LogMarker, template: String, arg1: Any): Unit = + if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1), mdc, marker)) + + /** + * Message template with 2 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def debug(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit = + if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1, arg2), mdc, marker)) + + /** + * Message template with 3 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def debug(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1, arg2, arg3), mdc, marker)) + + /** + * Message template with 4 replacement arguments. + * The marker argument can be picked up by various logging frameworks such as slf4j to mark this log statement as "special". + * @see [[LoggingAdapter]] + */ + def debug(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = + if (isDebugEnabled) bus.publish(Debug(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker)) + +} + +final class DiagnosticMarkerBusLoggingAdapter( + override val bus: LoggingBus, + override val logSource: String, + override val logClass: Class[_], + loggingFilter: LoggingFilter) + extends MarkerLoggingAdapter(bus, logSource, logClass, loggingFilter) with DiagnosticLoggingAdapter + /** * [[akka.event.LoggingAdapter]] that publishes [[akka.event.Logging.LogEvent]] to event stream. */ @@ -1246,11 +1644,16 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class def isInfoEnabled = loggingFilter.isInfoEnabled(logClass, logSource) def isDebugEnabled = loggingFilter.isDebugEnabled(logClass, logSource) - protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, message, mdc)) - protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, message, mdc)) - protected def notifyWarning(message: String): Unit = bus.publish(Warning(logSource, logClass, message, mdc)) - protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message, mdc)) - protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message, mdc)) + protected def notifyError(message: String): Unit = + bus.publish(Error(logSource, logClass, message, mdc)) + protected def notifyError(cause: Throwable, message: String): Unit = + bus.publish(Error(cause, logSource, logClass, message, mdc)) + protected def notifyWarning(message: String): Unit = + bus.publish(Warning(logSource, logClass, message, mdc)) + protected def notifyInfo(message: String): Unit = + bus.publish(Info(logSource, logClass, message, mdc)) + protected def notifyDebug(message: String): Unit = + bus.publish(Debug(logSource, logClass, message, mdc)) } /** diff --git a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala index 154cdfcf04..6c3eff2d63 100644 --- a/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala +++ b/akka-camel/src/test/scala/akka/camel/DefaultCamelTest.scala @@ -8,13 +8,13 @@ import akka.camel.TestSupport.SharedCamelSystem import internal.DefaultCamel import org.scalatest.Matchers import org.scalatest.mock.MockitoSugar -import org.apache.camel.{ ProducerTemplate } +import org.apache.camel.ProducerTemplate import org.scalatest.WordSpec -import akka.event.LoggingAdapter +import akka.event.{ LoggingAdapter, MarkerLoggingAdapter } import akka.actor.ActorSystem.Settings import com.typesafe.config.ConfigFactory import org.apache.camel.impl.DefaultCamelContext -import akka.actor.{ ExtendedActorSystem } +import akka.actor.ExtendedActorSystem class DefaultCamelTest extends WordSpec with SharedCamelSystem with Matchers with MockitoSugar { @@ -26,7 +26,7 @@ class DefaultCamelTest extends WordSpec with SharedCamelSystem with Matchers wit when(sys.name) thenReturn ("mocksystem") def camelWithMocks = new DefaultCamel(sys) { - override val log = mock[LoggingAdapter] + override val log = mock[MarkerLoggingAdapter] override lazy val template = mock[ProducerTemplate] override lazy val context = mock[DefaultCamelContext] override val settings = mock[CamelSettings] diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 889d3c697b..81dce6d1ac 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -8,23 +8,27 @@ import language.postfixOps import org.scalatest.mock.MockitoSugar import org.mockito.Matchers.any import org.mockito.Mockito._ -import org.apache.camel.{ ProducerTemplate, AsyncCallback } +import org.apache.camel.{ AsyncCallback, ProducerTemplate } import java.util.concurrent.atomic.AtomicBoolean + import scala.concurrent.duration._ import java.lang.String + import akka.camel._ -import internal.{ DefaultCamel, CamelExchangeAdapter } -import org.scalatest.{ Suite, WordSpecLike, BeforeAndAfterAll, BeforeAndAfterEach } +import internal.{ CamelExchangeAdapter, DefaultCamel } +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Suite, WordSpecLike } import akka.camel.TestSupport._ -import java.util.concurrent.{ TimeoutException, CountDownLatch } -import org.mockito.{ ArgumentMatcher, Matchers ⇒ MMatchers, Mockito } +import java.util.concurrent.{ CountDownLatch, TimeoutException } + +import org.mockito.{ ArgumentMatcher, Mockito, Matchers ⇒ MMatchers } import org.scalatest.Matchers -import akka.actor.Status.{ Failure } +import akka.actor.Status.Failure import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem.Settings -import akka.event.LoggingAdapter -import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe } +import akka.event.{ LoggingAdapter, MarkerLoggingAdapter } +import akka.testkit.{ TestKit, TestLatch, TestProbe, TimingTest } import org.apache.camel.impl.DefaultCamelContext + import scala.concurrent.{ Await, Future } import akka.util.Timeout import akka.actor._ @@ -350,7 +354,7 @@ private[camel] trait ActorProducerFixture extends MockitoSugar with BeforeAndAft when(sys.name) thenReturn ("mocksystem") def camelWithMocks = new DefaultCamel(sys) { - override val log = mock[LoggingAdapter] + override val log = mock[MarkerLoggingAdapter] override lazy val template = mock[ProducerTemplate] override lazy val context = mock[DefaultCamelContext] override val settings = new CamelSettings(ConfigFactory.parseString( diff --git a/akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala b/akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala index a29079e148..d7d0da7542 100644 --- a/akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala +++ b/akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala @@ -70,25 +70,20 @@ trait JavaLoggingAdapter extends LoggingAdapter { def isDebugEnabled = logger.isLoggable(logging.Level.CONFIG) - protected def notifyError(message: String) { + protected def notifyError(message: String): Unit = log(logging.Level.SEVERE, null, message) - } - protected def notifyError(cause: Throwable, message: String) { + protected def notifyError(cause: Throwable, message: String): Unit = log(logging.Level.SEVERE, cause, message) - } - protected def notifyWarning(message: String) { + protected def notifyWarning(message: String): Unit = log(logging.Level.WARNING, null, message) - } - protected def notifyInfo(message: String) { + protected def notifyInfo(message: String): Unit = log(logging.Level.INFO, null, message) - } - protected def notifyDebug(message: String) { + protected def notifyDebug(message: String): Unit = log(logging.Level.CONFIG, null, message) - } @inline def log(level: logging.Level, cause: Throwable, message: String) { diff --git a/akka-docs/rst/java/logging.rst b/akka-docs/rst/java/logging.rst index 9d95c48233..fccc064e87 100644 --- a/akka-docs/rst/java/logging.rst +++ b/akka-docs/rst/java/logging.rst @@ -434,3 +434,24 @@ Now, the values will be available in the MDC, so you can use them in the layout + +Using Markers +------------- + +Some logging libraries allow, in addition to MDC data, attaching so called "markers" to log statements. +These are used to filter out rare and special events, for example you might want to mark logs that detect +some malicious activity and mark them with a ``SECURITY`` tag, and in your appender configuration make these +trigger emails and other notifications immediately. + +Markers are available through the LoggingAdapters, when obtained via ``Logging.withMarker``. +The first argument passed into all log calls then should be a ``akka.event.LogMarker``. + +The slf4j bridge provided by akka in ``akka-slf4j`` will automatically pick up this marker value and make it available to SLF4J. +For example you could use it like this:: + + %date{ISO8601} [%marker][%level] [%msg]%n + +A more advanced (including most Akka added information) example pattern would be:: + + %date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n + diff --git a/akka-docs/rst/scala/logging.rst b/akka-docs/rst/scala/logging.rst index 796ea7aca1..7d93c66cda 100644 --- a/akka-docs/rst/scala/logging.rst +++ b/akka-docs/rst/scala/logging.rst @@ -446,7 +446,7 @@ get it you will use the factory receiving an Actor as logSource: val log: DiagnosticLoggingAdapter = Logging(this); Once you have the logger, you just need to add the custom values before you log something. -This way, the values will be put in the SLF4J MDC right before appending the log and removed after. +This way, the values will dologbe put in the SLF4J MDC right before appending the log and removed after. .. note:: @@ -473,3 +473,24 @@ Now, the values will be available in the MDC, so you can use them in the layout + +Using Markers +------------- + +Some logging libraries allow, in addition to MDC data, attaching so called "markers" to log statements. +These are used to filter out rare and special events, for example you might want to mark logs that detect +some malicious activity and mark them with a ``SECURITY`` tag, and in your appender configuration make these +trigger emails and other notifications immediately. + +Markers are available through the LoggingAdapters, when obtained via ``Logging.withMarker``. +The first argument passed into all log calls then should be a ``akka.event.LogMarker``. + +The slf4j bridge provided by akka in ``akka-slf4j`` will automatically pick up this marker value and make it available to SLF4J. +For example you could use it like this:: + + %date{ISO8601} [%marker][%level] [%msg]%n + +A more advanced (including most Akka added information) example pattern would be:: + + %date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n + diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala new file mode 100644 index 0000000000..93f97719b0 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.http.javadsl + +import java.util.{ Optional, Collection ⇒ JCollection } +import javax.net.ssl.{ SSLContext, SSLParameters } + +import akka.http.scaladsl +import akka.japi.Util +import akka.stream.TLSClientAuth +import akka.http.impl.util.JavaMapping.Implicits._ +import com.lightbend.sslconfig.akka.AkkaSSLConfig + +import scala.compat.java8.OptionConverters +import scala.collection.JavaConverters._ + +object ConnectionContext { + //#https-context-creation + // ConnectionContext + /** Used to serve HTTPS traffic. */ + def https(sslContext: SSLContext): HttpsConnectionContext = + scaladsl.ConnectionContext.https(sslContext) + + /** Used to serve HTTPS traffic. */ + def https( + sslContext: SSLContext, + sslConfig: Optional[AkkaSSLConfig], + enabledCipherSuites: Optional[JCollection[String]], + enabledProtocols: Optional[JCollection[String]], + clientAuth: Optional[TLSClientAuth], + sslParameters: Optional[SSLParameters]) = + scaladsl.ConnectionContext.https( + sslContext, + OptionConverters.toScala(sslConfig), + OptionConverters.toScala(enabledCipherSuites).map(Util.immutableSeq(_)), + OptionConverters.toScala(enabledProtocols).map(Util.immutableSeq(_)), + OptionConverters.toScala(clientAuth), + OptionConverters.toScala(sslParameters)) + //#https-context-creation + + /** Used to serve HTTPS traffic. */ + // for binary-compatibility, since 2.4.7 + def https( + sslContext: SSLContext, + enabledCipherSuites: Optional[JCollection[String]], + enabledProtocols: Optional[JCollection[String]], + clientAuth: Optional[TLSClientAuth], + sslParameters: Optional[SSLParameters]) = + scaladsl.ConnectionContext.https( + sslContext, + OptionConverters.toScala(enabledCipherSuites).map(Util.immutableSeq(_)), + OptionConverters.toScala(enabledProtocols).map(Util.immutableSeq(_)), + OptionConverters.toScala(clientAuth), + OptionConverters.toScala(sslParameters)) + + /** Used to serve HTTP traffic. */ + def noEncryption(): HttpConnectionContext = + scaladsl.ConnectionContext.noEncryption() +} + +abstract class ConnectionContext { + def isSecure: Boolean + /** Java API */ + def getDefaultPort: Int + def sslConfig: Option[AkkaSSLConfig] +} + +abstract class HttpConnectionContext extends akka.http.javadsl.ConnectionContext { + override final def isSecure = false + override final def getDefaultPort = 80 + override def sslConfig: Option[AkkaSSLConfig] = None +} + +abstract class HttpsConnectionContext extends akka.http.javadsl.ConnectionContext { + override final def isSecure = true + override final def getDefaultPort = 443 + + /** Java API */ + def getEnabledCipherSuites: Optional[JCollection[String]] + /** Java API */ + def getEnabledProtocols: Optional[JCollection[String]] + /** Java API */ + def getClientAuth: Optional[TLSClientAuth] + + /** Java API */ + def getSslContext: SSLContext + /** Java API */ + def getSslParameters: Optional[SSLParameters] +} diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala new file mode 100644 index 0000000000..d669329d32 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -0,0 +1,751 @@ +/* + * Copyright (C) 2009-2016 Lightbend Inc. + */ + +package akka.http.javadsl + +import java.net.InetSocketAddress +import java.util.Optional + +import akka.http.impl.util.JavaMapping +import akka.http.impl.util.JavaMapping.HttpsConnectionContext +import akka.http.javadsl.model.ws._ +import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } +import akka.{ NotUsed, stream } +import akka.stream.TLSProtocol._ + +import scala.concurrent.Future +import scala.util.Try +import akka.stream.scaladsl.Keep +import akka.japi.{ Function, Pair } +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider } +import akka.event.LoggingAdapter +import akka.stream.Materializer +import akka.stream.javadsl.{ BidiFlow, Flow, Source } +import akka.http.impl.util.JavaMapping.Implicits._ +import akka.http.scaladsl.{ model ⇒ sm } +import akka.http.javadsl.model._ +import akka.http._ + +import scala.compat.java8.OptionConverters._ +import scala.compat.java8.FutureConverters._ +import java.util.concurrent.CompletionStage + +import com.lightbend.sslconfig.akka.AkkaSSLConfig + +object Http extends ExtensionId[Http] with ExtensionIdProvider { + override def get(system: ActorSystem): Http = super.get(system) + def lookup() = Http + def createExtension(system: ExtendedActorSystem): Http = new Http(system) +} + +class Http(system: ExtendedActorSystem) extends akka.actor.Extension { + import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ ec } + + import language.implicitConversions + private implicit def completionStageCovariant[T, U >: T](in: CompletionStage[T]): CompletionStage[U] = in.asInstanceOf[CompletionStage[U]] + private implicit def javaModelIsScalaModel[J <: AnyRef, S <: J](in: Future[J])(implicit ev: JavaMapping.Inherited[J, S]): Future[S] = in.asInstanceOf[Future[S]] + + private lazy val delegate = akka.http.scaladsl.Http(system) + + /** + * Constructs a server layer stage using the configured default [[akka.http.javadsl.settings.ServerSettings]]. The returned [[BidiFlow]] isn't + * reusable and can only be materialized once. + */ + def serverLayer(materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = + adaptServerLayer(delegate.serverLayer()(materializer)) + + /** + * Constructs a server layer stage using the given [[akka.http.javadsl.settings.ServerSettings]]. The returned [[BidiFlow]] isn't reusable and + * can only be materialized once. + */ + def serverLayer( + settings: ServerSettings, + materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = + adaptServerLayer(delegate.serverLayer(settings.asScala)(materializer)) + + /** + * Constructs a server layer stage using the given [[akka.http.javadsl.settings.ServerSettings]]. The returned [[BidiFlow]] isn't reusable and + * can only be materialized once. The `remoteAddress`, if provided, will be added as a header to each [[HttpRequest]] + * this layer produces if the `akka.http.server.remote-address-header` configuration option is enabled. + */ + def serverLayer( + settings: ServerSettings, + remoteAddress: Optional[InetSocketAddress], + materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = + adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala)(materializer)) + + /** + * Constructs a server layer stage using the given [[ServerSettings]]. The returned [[BidiFlow]] isn't reusable and + * can only be materialized once. The remoteAddress, if provided, will be added as a header to each [[HttpRequest]] + * this layer produces if the `akka.http.server.remote-address-header` configuration option is enabled. + */ + def serverLayer( + settings: ServerSettings, + remoteAddress: Optional[InetSocketAddress], + log: LoggingAdapter, + materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = + adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala, log)(materializer)) + + /** + * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding + * on the given `endpoint`. + * + * If the given port is 0 the resulting source can be materialized several times. Each materialization will + * then be assigned a new local port by the operating system, which can then be retrieved by the materialized + * [[ServerBinding]]. + * + * If the given port is non-zero subsequent materialization attempts of the produced source will immediately + * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized + * [[ServerBinding]]. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bind(connect: ConnectHttp, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + new Source(delegate.bind(connect.host, connect.port, connectionContext)(materializer) + .map(new IncomingConnection(_)) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + } + + /** + * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding + * on the given `endpoint`. + * + * If the given port is 0 the resulting source can be materialized several times. Each materialization will + * then be assigned a new local port by the operating system, which can then be retrieved by the materialized + * [[ServerBinding]]. + * + * If the given port is non-zero subsequent materialization attempts of the produced source will immediately + * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized + * [[ServerBinding]]. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bind( + connect: ConnectHttp, + settings: ServerSettings, + materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + new Source(delegate.bind(connect.host, connect.port, settings = settings.asScala, connectionContext = connectionContext)(materializer) + .map(new IncomingConnection(_)) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + } + + /** + * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding + * on the given `endpoint`. + * + * If the given port is 0 the resulting source can be materialized several times. Each materialization will + * then be assigned a new local port by the operating system, which can then be retrieved by the materialized + * [[ServerBinding]]. + * + * If the given port is non-zero subsequent materialization attempts of the produced source will immediately + * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized + * [[ServerBinding]]. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bind( + connect: ConnectHttp, + settings: ServerSettings, + log: LoggingAdapter, + materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + new Source(delegate.bind(connect.host, connect.port, connectionContext, settings.asScala, log)(materializer) + .map(new IncomingConnection(_)) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + } + + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bindAndHandle( + handler: Flow[HttpRequest, HttpResponse, _], + connect: ConnectHttp, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandle( + handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, + connect.host, connect.port, connectionContext)(materializer) + .map(new ServerBinding(_))(ec).toJava + } + + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bindAndHandle( + handler: Flow[HttpRequest, HttpResponse, _], + connect: ConnectHttp, + settings: ServerSettings, + log: LoggingAdapter, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandle( + handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, + connect.host, connect.port, connectionContext, settings.asScala, log)(materializer) + .map(new ServerBinding(_))(ec).toJava + } + + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bindAndHandleSync( + handler: Function[HttpRequest, HttpResponse], + connect: ConnectHttp, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandleSync(handler.apply(_).asScala, connect.host, connect.port, connectionContext)(materializer) + .map(new ServerBinding(_))(ec).toJava + } + + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bindAndHandleSync( + handler: Function[HttpRequest, HttpResponse], + connect: ConnectHttp, + settings: ServerSettings, + log: LoggingAdapter, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandleSync( + handler.apply(_).asScala, + connect.host, connect.port, connectionContext, settings.asScala, log)(materializer) + .map(new ServerBinding(_))(ec).toJava + } + + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bindAndHandleAsync( + handler: Function[HttpRequest, CompletionStage[HttpResponse]], + connect: ConnectHttp, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandleAsync(handler.apply(_).toScala, connect.host, connect.port, connectionContext)(materializer) + .map(new ServerBinding(_))(ec).toJava + } + + /** + * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` + * [[Flow]] for processing all incoming connections. + * + * The number of concurrently accepted connections can be configured by overriding + * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. + */ + def bindAndHandleAsync( + handler: Function[HttpRequest, CompletionStage[HttpResponse]], + connect: ConnectHttp, + settings: ServerSettings, + parallelism: Int, log: LoggingAdapter, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandleAsync( + handler.apply(_).toScala, + connect.host, connect.port, connectionContext, settings.asScala, parallelism, log)(materializer) + .map(new ServerBinding(_))(ec).toJava + } + + /** + * Constructs a client layer stage using the configured default [[akka.http.javadsl.settings.ClientConnectionSettings]]. + */ + def clientLayer(hostHeader: headers.Host): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] = + adaptClientLayer(delegate.clientLayer(JavaMapping.toScala(hostHeader))) + + /** + * Constructs a client layer stage using the given [[akka.http.javadsl.settings.ClientConnectionSettings]]. + */ + def clientLayer( + hostHeader: headers.Host, + settings: ClientConnectionSettings): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] = + adaptClientLayer(delegate.clientLayer(JavaMapping.toScala(hostHeader), settings.asScala)) + + /** + * Constructs a client layer stage using the given [[ClientConnectionSettings]]. + */ + def clientLayer( + hostHeader: headers.Host, + settings: ClientConnectionSettings, + log: LoggingAdapter): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] = + adaptClientLayer(delegate.clientLayer(JavaMapping.toScala(hostHeader), settings.asScala, log)) + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + * + * If the hostname is given with an `https://` prefix, the default [[HttpsConnectionContext]] will be used. + */ + def outgoingConnection(host: String): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] = + outgoingConnection(ConnectHttp.toHost(host)) + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + * + * Use the [[ConnectHttp]] DSL to configure target host and whether HTTPS should be used. + */ + def outgoingConnection(to: ConnectHttp): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] = + adaptOutgoingFlow { + if (to.isHttps) delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala) + else delegate.outgoingConnection(to.host, to.port) + } + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + */ + def outgoingConnection( + to: ConnectHttp, + localAddress: Optional[InetSocketAddress], + settings: ClientConnectionSettings, + log: LoggingAdapter): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] = + adaptOutgoingFlow { + if (to.isHttps) + delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asInstanceOf[HttpsConnectionContext].asScala, localAddress.asScala, settings.asScala, log) + else + delegate.outgoingConnection(to.host, to.port, localAddress.asScala, settings.asScala, log) + } + + /** + * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches + * the requests from all its materializations across this pool. + * While the started host connection pool internally shuts itself down automatically after the configured idle + * timeout it will spin itself up again if more requests arrive from an existing or a new client flow + * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def newHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + newHostConnectionPool[T](ConnectHttp.toHost(host), materializer) + + /** + * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches + * the requests from all its materializations across this pool. + * While the started host connection pool internally shuts itself down automatically after the configured idle + * timeout it will spin itself up again if more requests arrive from an existing or a new client flow + * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def newHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPool[T](to.host, to.port)(materializer).mapMaterializedValue(_.toJava)) + + /** + * Same as [[newHostConnectionPool]] but with HTTPS encryption. + * + * The given [[ConnectionContext]] will be used for encryption on the connection. + */ + def newHostConnectionPool[T]( + to: ConnectHttp, + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow { + to.effectiveHttpsConnectionContext(defaultClientHttpsContext) match { + case https: HttpsConnectionContext ⇒ + delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer) + .mapMaterializedValue(_.toJava) + case _ ⇒ + delegate.newHostConnectionPool[T](to.host, to.port, settings.asScala, log)(materializer) + .mapMaterializedValue(_.toJava) + } + } + + /** + * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing + * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool + * configuration a separate connection pool is maintained. + * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. + * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. + * + * The internal caching logic guarantees that there will never be more than a single pool running for the + * given target host endpoint and configuration (in this ActorSystem). + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def cachedHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + cachedHostConnectionPool(ConnectHttp.toHost(host), materializer) + + /** + * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing + * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool + * configuration a separate connection pool is maintained. + * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. + * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. + * + * The internal caching logic guarantees that there will never be more than a single pool running for the + * given target host endpoint and configuration (in this ActorSystem). + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def cachedHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPool[T](to.host, to.port)(materializer).mapMaterializedValue(_.toJava)) + + /** + * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. + * + * The given [[ConnectionContext]] will be used for encryption on the connection. + */ + def cachedHostConnectionPool[T]( + to: ConnectHttp, + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPoolHttps[T](to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala, settings.asScala, log)(materializer) + .mapMaterializedValue(_.toJava)) + + /** + * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool + * depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or + * a valid `Host` header. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests `A` and `B` enter the flow in that order the response for `B` might be produced before the + * response for `A`. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] = + adaptTupleFlow(delegate.superPool[T]()(materializer)) + + /** + * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool + * depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or + * a valid `Host` header. + * + * The given [[HttpsConnectionContext]] is used to configure TLS for the connection. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the + * response for `A`. + * + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def superPool[T]( + settings: ConnectionPoolSettings, + connectionContext: HttpsConnectionContext, + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] = + adaptTupleFlow(delegate.superPool[T](connectionContext.asScala, settings.asScala, log)(materializer)) + + /** + * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool + * depending on their respective effective URIs. Note that incoming requests must have either an absolute URI or + * a valid `Host` header. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests `A` and `B` enter the `flow` in that order the response for `B` might be produced before the + * response for `A`. + * + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type `T` from the application which is emitted together with the corresponding response. + */ + def superPool[T]( + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] = + adaptTupleFlow(delegate.superPool[T](defaultClientHttpsContext.asScala, settings.asScala, log)(materializer)) + + /** + * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's + * effective URI to produce a response future. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + * + * Note that the request must have either an absolute URI or a valid `Host` header, otherwise + * the future will be completed with an error. + */ + def singleRequest(request: HttpRequest, materializer: Materializer): CompletionStage[HttpResponse] = + delegate.singleRequest(request.asScala)(materializer).toJava + + /** + * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's + * effective URI to produce a response future. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + * + * Note that the request must have either an absolute URI or a valid `Host` header, otherwise + * the future will be completed with an error. + */ + def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext, materializer: Materializer): CompletionStage[HttpResponse] = + delegate.singleRequest(request.asScala, connectionContext.asScala)(materializer).toJava + + /** + * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's + * effective URI to produce a response future. + * + * The given [[HttpsConnectionContext]] will be used for encruption if the request is sent to an https endpoint. + * + * Note that the request must have either an absolute URI or a valid `Host` header, otherwise + * the future will be completed with an error. + */ + def singleRequest( + request: HttpRequest, + connectionContext: HttpsConnectionContext, + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: Materializer): CompletionStage[HttpResponse] = + delegate.singleRequest(request.asScala, connectionContext.asScala, settings.asScala, log)(materializer).toJava + + /** + * Constructs a WebSocket [[BidiFlow]]. + * + * The layer is not reusable and must only be materialized once. + */ + def webSocketClientLayer(request: WebSocketRequest): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] = + adaptWsBidiFlow(delegate.webSocketClientLayer(request.asScala)) + + /** + * Constructs a WebSocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]], + * configured using the `akka.http.client` config section. + * + * The layer is not reusable and must only be materialized once. + */ + def webSocketClientLayer( + request: WebSocketRequest, + settings: ClientConnectionSettings): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] = + adaptWsBidiFlow(delegate.webSocketClientLayer(request.asScala, settings.asScala)) + + /** + * Constructs a WebSocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]], + * configured using the `akka.http.client` config section. + * + * The layer is not reusable and must only be materialized once. + */ + def webSocketClientLayer( + request: WebSocketRequest, + settings: ClientConnectionSettings, + log: LoggingAdapter): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] = + adaptWsBidiFlow(delegate.webSocketClientLayer(request.asScala, settings.asScala, log)) + + /** + * Constructs a flow that once materialized establishes a WebSocket connection to the given Uri. + * + * The layer is not reusable and must only be materialized once. + */ + def webSocketClientFlow(request: WebSocketRequest): Flow[Message, Message, CompletionStage[WebSocketUpgradeResponse]] = + adaptWsFlow { + delegate.webSocketClientFlow(request.asScala) + } + + /** + * Constructs a flow that once materialized establishes a WebSocket connection to the given Uri. + * + * The layer is not reusable and must only be materialized once. + */ + def webSocketClientFlow( + request: WebSocketRequest, + connectionContext: ConnectionContext, + localAddress: Optional[InetSocketAddress], + settings: ClientConnectionSettings, + log: LoggingAdapter): Flow[Message, Message, CompletionStage[WebSocketUpgradeResponse]] = + adaptWsFlow { + delegate.webSocketClientFlow(request.asScala, connectionContext.asScala, localAddress.asScala, settings.asScala, log) + } + + /** + * Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the + * WebSocket conversation. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + */ + def singleWebSocketRequest[T]( + request: WebSocketRequest, + clientFlow: Flow[Message, Message, T], + materializer: Materializer): Pair[CompletionStage[WebSocketUpgradeResponse], T] = + adaptWsResultTuple { + delegate.singleWebSocketRequest( + request.asScala, + adaptWsFlow[T](clientFlow))(materializer) + } + + /** + * Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the + * WebSocket conversation. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + */ + def singleWebSocketRequest[T]( + request: WebSocketRequest, + clientFlow: Flow[Message, Message, T], + connectionContext: ConnectionContext, + materializer: Materializer): Pair[CompletionStage[WebSocketUpgradeResponse], T] = + adaptWsResultTuple { + delegate.singleWebSocketRequest( + request.asScala, + adaptWsFlow[T](clientFlow), + connectionContext.asScala)(materializer) + } + + /** + * Runs a single WebSocket conversation given a Uri and a flow that represents the client side of the + * WebSocket conversation. + */ + def singleWebSocketRequest[T]( + request: WebSocketRequest, + clientFlow: Flow[Message, Message, T], + connectionContext: ConnectionContext, + localAddress: Optional[InetSocketAddress], + settings: ClientConnectionSettings, + log: LoggingAdapter, + materializer: Materializer): Pair[CompletionStage[WebSocketUpgradeResponse], T] = + adaptWsResultTuple { + delegate.singleWebSocketRequest( + request.asScala, + adaptWsFlow[T](clientFlow), + connectionContext.asScala, + localAddress.asScala, + settings.asScala, + log)(materializer) + } + + /** + * Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]]. + * The returned future is completed when all pools that were live at the time of this method call + * have completed their shutdown process. + * + * If existing pool client flows are re-used or new ones materialized concurrently with or after this + * method call the respective connection pools will be restarted and not contribute to the returned future. + */ + def shutdownAllConnectionPools(): CompletionStage[Unit] = delegate.shutdownAllConnectionPools().toJava + + /** + * Gets the current default server-side [[ConnectionContext]] – defaults to plain HTTP. + * Can be modified using [[setDefaultServerHttpContext]], and will then apply for servers bound after that call has completed. + */ + def defaultServerHttpContext: ConnectionContext = + delegate.defaultServerHttpContext + + /** + * Sets the default server-side [[ConnectionContext]]. + * If it is an instance of [[HttpsConnectionContext]] then the server will be bound using HTTPS. + */ + def setDefaultServerHttpContext(context: ConnectionContext): Unit = + delegate.setDefaultServerHttpContext(context.asScala) + + /** + * Gets the current default client-side [[ConnectionContext]]. + */ + def defaultClientHttpsContext: akka.http.javadsl.HttpsConnectionContext = delegate.defaultClientHttpsContext + + /** + * Sets the default client-side [[ConnectionContext]]. + */ + def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit = + delegate.setDefaultClientHttpsContext(context.asScala) + + def createServerHttpsContext(sslConfig: AkkaSSLConfig): HttpsConnectionContext = + delegate.createServerHttpsContext(sslConfig) + + def createClientHttpsContext(sslConfig: AkkaSSLConfig): HttpsConnectionContext = + delegate.createClientHttpsContext(sslConfig) + + def createDefaultClientHttpsContext(): HttpsConnectionContext = + delegate.createDefaultClientHttpsContext() + + private def adaptTupleFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = { + implicit val _ = JavaMapping.identity[T] + JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat]) + } + + private def adaptOutgoingFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[scaladsl.model.HttpRequest, scaladsl.model.HttpResponse, Future[scaladsl.Http.OutgoingConnection]]): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] = + Flow.fromGraph { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(scalaFlow)(Keep.right) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava) + } + + private def adaptServerLayer(serverLayer: scaladsl.Http.ServerLayer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = + new BidiFlow( + JavaMapping.adapterBidiFlow[HttpResponse, sm.HttpResponse, sm.HttpRequest, HttpRequest] + .atop(serverLayer)) + + private def adaptClientLayer(clientLayer: scaladsl.Http.ClientLayer): BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, NotUsed] = + new BidiFlow( + JavaMapping.adapterBidiFlow[HttpRequest, sm.HttpRequest, sm.HttpResponse, HttpResponse] + .atop(clientLayer)) + + private def adaptWsBidiFlow(wsLayer: scaladsl.Http.WebSocketClientLayer): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, CompletionStage[WebSocketUpgradeResponse]] = + new BidiFlow( + JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message] + .atopMat(wsLayer)((_, s) ⇒ adaptWsUpgradeResponse(s))) + + private def adaptWsFlow(wsLayer: stream.scaladsl.Flow[sm.ws.Message, sm.ws.Message, Future[scaladsl.model.ws.WebSocketUpgradeResponse]]): Flow[Message, Message, CompletionStage[WebSocketUpgradeResponse]] = + Flow.fromGraph(JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message].joinMat(wsLayer)(Keep.right).mapMaterializedValue(adaptWsUpgradeResponse _)) + + private def adaptWsFlow[Mat](javaFlow: Flow[Message, Message, Mat]): stream.scaladsl.Flow[scaladsl.model.ws.Message, scaladsl.model.ws.Message, Mat] = + stream.scaladsl.Flow[scaladsl.model.ws.Message] + .map(Message.adapt) + .viaMat(javaFlow.asScala)(Keep.right) + .map(_.asScala) + + private def adaptWsResultTuple[T](result: (Future[scaladsl.model.ws.WebSocketUpgradeResponse], T)): Pair[CompletionStage[WebSocketUpgradeResponse], T] = + result match { + case (fut, tMat) ⇒ Pair(adaptWsUpgradeResponse(fut), tMat) + } + private def adaptWsUpgradeResponse(responseFuture: Future[scaladsl.model.ws.WebSocketUpgradeResponse]): CompletionStage[WebSocketUpgradeResponse] = + responseFuture.map(WebSocketUpgradeResponse.adapt)(system.dispatcher).toJava +} diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala index 432663ba52..c30f270349 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala @@ -4,8 +4,7 @@ package akka.event.slf4j -import org.slf4j.{ Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory } -import org.slf4j.MDC +import org.slf4j.{ MDC, Marker, MarkerFactory, Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory } import akka.event.Logging._ import akka.actor._ import akka.event.DummyClassForStringSources @@ -67,19 +66,27 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg case event @ Error(cause, logSource, logClass, message) ⇒ withMdc(logSource, event) { cause match { - case Error.NoCause | null ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else null) - case _ ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause) + case Error.NoCause | null ⇒ + Logger(logClass, logSource).error(markerIfPresent(event), if (message != null) message.toString else null) + case _ ⇒ + Logger(logClass, logSource).error(markerIfPresent(event), if (message != null) message.toString else cause.getLocalizedMessage, cause) } } case event @ Warning(logSource, logClass, message) ⇒ - withMdc(logSource, event) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } + withMdc(logSource, event) { + Logger(logClass, logSource).warn(markerIfPresent(event), "{}", message.asInstanceOf[AnyRef]) + } case event @ Info(logSource, logClass, message) ⇒ - withMdc(logSource, event) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } + withMdc(logSource, event) { + Logger(logClass, logSource).info(markerIfPresent(event), "{}", message.asInstanceOf[AnyRef]) + } case event @ Debug(logSource, logClass, message) ⇒ - withMdc(logSource, event) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } + withMdc(logSource, event) { + Logger(logClass, logSource).debug(markerIfPresent(event), "{}", message.asInstanceOf[AnyRef]) + } case InitializeLogger(_) ⇒ log.info("Slf4jLogger started") @@ -102,6 +109,12 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg } } + private final def markerIfPresent(event: LogEvent): Marker = + event match { + case m: LogEventWithMarker ⇒ MarkerFactory.getMarker(m.marker.name) + case _ ⇒ null + } + /** * Override this method to provide a differently formatted timestamp * @param timestamp a "currentTimeMillis"-obtained timestamp diff --git a/akka-slf4j/src/test/resources/logback-test.xml b/akka-slf4j/src/test/resources/logback-test.xml index 5a4112faf3..93b8167d6a 100644 --- a/akka-slf4j/src/test/resources/logback-test.xml +++ b/akka-slf4j/src/test/resources/logback-test.xml @@ -7,7 +7,7 @@ - %date{ISO8601} level=[%level] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n + %date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n - \ No newline at end of file + diff --git a/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala b/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala index e10d1a919f..93b29427e2 100644 --- a/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala +++ b/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala @@ -4,13 +4,14 @@ package akka.event.slf4j import language.postfixOps - import akka.testkit.AkkaSpec -import akka.actor.{ DiagnosticActorLogging, Actor, Props } +import akka.actor.{ Actor, DiagnosticActorLogging, Props } + import scala.concurrent.duration._ -import akka.event.Logging +import akka.event.{ LogMarker, Logging } import ch.qos.logback.core.OutputStreamAppender import java.io.ByteArrayOutputStream + import org.scalatest.BeforeAndAfterEach object Slf4jLoggerSpec { @@ -25,9 +26,12 @@ object Slf4jLoggerSpec { } """ - case class StringWithMDC(s: String, mdc: Map[String, Any]) + final case class StringWithMDC(s: String, mdc: Map[String, Any]) + final case class StringWithMarker(s: String, marker: LogMarker) - class LogProducer extends Actor with DiagnosticActorLogging { + final class LogProducer extends Actor with DiagnosticActorLogging { + + val markLog = Logging.withMarker(this) def receive = { case e: Exception ⇒ @@ -38,6 +42,8 @@ object Slf4jLoggerSpec { log.mdc(mdc) log.info(s) log.clearMDC() + case StringWithMarker(s, marker) ⇒ + markLog.info(marker, s) } } @@ -95,6 +101,15 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft s should include("msg=[test x=3 y=17]") } + "log info with marker" in { + producer ! StringWithMarker("security-wise interesting message", LogMarker("SECURITY")) + + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s should include("marker=[SECURITY]") + s should include("msg=[security-wise interesting message]") + } + "put custom MDC values when specified" in { producer ! StringWithMDC("Message with custom MDC values", Map("ticketNumber" → 3671, "ticketDesc" → "Custom MDC Values")) diff --git a/akka-ssl-config/src/main/scala/com/lightbend/sslconfig/akka/SSLEngineConfigurator.scala b/akka-ssl-config/src/main/scala/com/lightbend/sslconfig/akka/SSLEngineConfigurator.scala new file mode 100644 index 0000000000..307660e1bf --- /dev/null +++ b/akka-ssl-config/src/main/scala/com/lightbend/sslconfig/akka/SSLEngineConfigurator.scala @@ -0,0 +1,15 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package com.lightbend.sslconfig.akka + +import javax.net.ssl.{ SSLContext, SSLEngine } + +/** + * Gives the chance to configure the SSLContext before it is going to be used. + * The passed in context will be already set in client mode and provided with hostInfo during initialization. + */ +trait SSLEngineConfigurator { + def configure(engine: SSLEngine, sslContext: SSLContext): SSLEngine +} diff --git a/akka-ssl-config/src/main/scala/com/lightbend/sslconfig/akka/util/AkkaLoggerFactory.scala b/akka-ssl-config/src/main/scala/com/lightbend/sslconfig/akka/util/AkkaLoggerFactory.scala new file mode 100644 index 0000000000..959468c3a6 --- /dev/null +++ b/akka-ssl-config/src/main/scala/com/lightbend/sslconfig/akka/util/AkkaLoggerFactory.scala @@ -0,0 +1,12 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package com.lightbend.sslconfig.akka.util + +/** INTERNAL API */ +final class AkkaLoggerFactory(system: ActorSystem) extends LoggerFactory { + override def apply(clazz: Class[_]): NoDepsLogger = new AkkaLoggerBridge(system.eventStream, clazz) + + override def apply(name: String): NoDepsLogger = new AkkaLoggerBridge(system.eventStream, name, classOf[DummyClassForStringSources]) +}