diff --git a/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala new file mode 100644 index 0000000000..e0ba90b1f9 --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor + +import java.io.File + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions +import com.typesafe.config.ConfigResolveOptions + +@deprecated("use ActorSystem instead", "2.0") +object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) { + start() +} + +/** + * Loads configuration (akka.conf) from same location as Akka 1.x + */ +@deprecated("use default config location or write your own configuration loader", "2.0") +object OldConfigurationLoader { + + val defaultConfig: Config = { + val cfg = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig + val config = cfg.withFallback(ConfigFactory.defaultReference) + config.checkValid(ConfigFactory.defaultReference, "akka") + config + } + + // file extensions (.conf, .json, .properties), are handled by parseFileAnySyntax + val defaultLocation: String = (systemMode orElse envMode).map("akka." + _).getOrElse("akka") + + private def envMode = System.getenv("AKKA_MODE") match { + case null | "" ⇒ None + case value ⇒ Some(value) + } + + private def systemMode = System.getProperty("akka.mode") match { + case null | "" ⇒ None + case value ⇒ Some(value) + } + + private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false) + + private def fromProperties = try { + val property = Option(System.getProperty("akka.config")) + property.map(p ⇒ + ConfigFactory.systemProperties.withFallback( + ConfigFactory.parseFileAnySyntax(new File(p), configParseOptions))) + } catch { case _ ⇒ None } + + private def fromClasspath = try { + Option(ConfigFactory.systemProperties.withFallback( + ConfigFactory.parseResourcesAnySyntax(ActorSystem.getClass, "/" + defaultLocation, configParseOptions))) + } catch { case _ ⇒ None } + + private def fromHome = try { + Option(ConfigFactory.systemProperties.withFallback( + ConfigFactory.parseFileAnySyntax(new File(ActorSystem.GlobalHome.get + "/config/" + defaultLocation), configParseOptions))) + } catch { case _ ⇒ None } + + private def emptyConfig = ConfigFactory.systemProperties +} \ No newline at end of file diff --git a/akka-actor-migration/src/main/scala/akka/actor/OldActor.scala b/akka-actor-migration/src/main/scala/akka/actor/OldActor.scala new file mode 100644 index 0000000000..0a9238209e --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/actor/OldActor.scala @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor + +import akka.japi.Creator +import akka.util.Timeout +import akka.dispatch.Future +import akka.dispatch.OldFuture +import akka.util.Duration +import java.util.concurrent.TimeUnit +import java.net.InetSocketAddress + +/** + * Migration replacement for `object akka.actor.Actor`. + */ +@deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0") +object OldActor { + + /** + * Creates an ActorRef out of the Actor with type T. + * It will be automatically started, i.e. remove old call to `start()`. + * + */ + @deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0") + def actorOf[T <: Actor: Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + + /** + * Creates an ActorRef out of the Actor of the specified Class. + * It will be automatically started, i.e. remove old call to `start()`. + */ + @deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0") + def actorOf(clazz: Class[_ <: Actor]): ActorRef = GlobalActorSystem.actorOf(Props(clazz)) + + /** + * Creates an ActorRef out of the Actor. Allows you to pass in a factory function + * that creates the Actor. Please note that this function can be invoked multiple + * times if for example the Actor is supervised and needs to be restarted. + * + * It will be automatically started, i.e. remove old call to `start()`. + */ + @deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0") + def actorOf(factory: ⇒ Actor): ActorRef = GlobalActorSystem.actorOf(Props(factory)) + + /** + * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) + * that creates the Actor. Please note that this function can be invoked multiple + * times if for example the Actor is supervised and needs to be restarted. + *

+ * JAVA API + */ + @deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0") + def actorOf(creator: Creator[Actor]): ActorRef = GlobalActorSystem.actorOf(Props(creator)) + + @deprecated("OldActor.remote should not be used", "2.0") + lazy val remote: OldRemoteSupport = new OldRemoteSupport + +} + +@deprecated("use Actor", "2.0") +abstract class OldActor extends Actor { + + implicit def askTimeout: Timeout = akka.migration.askTimeout + + implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = akka.migration.future2OldFuture(future) + + implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef) + + @deprecated("Use context.become instead", "2.0") + def become(behavior: Receive, discardOld: Boolean = true) = context.become(behavior, discardOld) + + @deprecated("Use context.unbecome instead", "2.0") + def unbecome() = context.unbecome() + + class OldActorRef(actorRef: ActorRef) { + @deprecated("Actors are automatically started when creatd, i.e. remove old call to start()", "2.0") + def start(): ActorRef = actorRef + + @deprecated("Stop with ActorSystem or ActorContext instead", "2.0") + def exit() = stop() + + @deprecated("Stop with ActorSystem or ActorContext instead", "2.0") + def stop(): Unit = context.stop(actorRef) + + @deprecated("Use context.getReceiveTimeout instead", "2.0") + def getReceiveTimeout(): Option[Long] = context.receiveTimeout.map(_.toMillis) + + @deprecated("Use context.setReceiveTimeout instead", "2.0") + def setReceiveTimeout(timeout: Long) = context.setReceiveTimeout(Duration(timeout, TimeUnit.MILLISECONDS)) + + @deprecated("Use context.getReceiveTimeout instead", "2.0") + def receiveTimeout: Option[Long] = getReceiveTimeout() + + @deprecated("Use context.setReceiveTimeout instead", "2.0") + def receiveTimeout_=(timeout: Option[Long]) = setReceiveTimeout(timeout.getOrElse(0L)) + + @deprecated("Use self.isTerminated instead", "2.0") + def isShutdown: Boolean = self.isTerminated + + @deprecated("Use sender instead", "2.0") + def channel() = context.sender + + @deprecated("Use sender instead", "2.0") + def sender() = Some(context.sender) + + @deprecated("Use sender ! instead", "2.0") + def reply(message: Any) = context.sender.!(message, context.self) + + @deprecated("Use sender ! instead", "2.0") + def tryReply(message: Any): Boolean = { + reply(message) + true + } + + @deprecated("Use sender ! instead", "2.0") + def tryTell(message: Any)(implicit sender: ActorRef = context.self): Boolean = { + actorRef.!(message)(sender) + true + } + + @deprecated("Use sender ! akka.actor.Status.Failure(e) instead", "2.0") + def sendException(ex: Throwable): Boolean = { + context.sender.!(akka.actor.Status.Failure(ex), context.self) + true + } + } +} + +class OldRemoteSupport { + + @deprecated("remote.start is not needed", "2.0") + def start() {} + + @deprecated("remote.start is not needed, use configuration to specify RemoteActorRefProvider, host and port", "2.0") + def start(host: String, port: Int) {} + + @deprecated("remote.start is not needed, use configuration to specify RemoteActorRefProvider, host and port", "2.0") + def start(host: String, port: Int, loader: ClassLoader) {} + + @deprecated("remote.shutdown is not needed", "2.0") + def shutdown() {} + + @deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0") + def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = + GlobalActorSystem.actorFor("akka://%s@%s:%s/user/%s".format(GlobalActorSystem.name, hostname, port, classNameOrServiceId)) + + @deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0") + def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, hostname, port) + + @deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0") + def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(serviceId, hostname, port) + + @deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0") + def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(serviceId, hostname, port) + + @deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0") + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, hostname, port) + + @deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0") + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, hostname, port) + + @deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0") + def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(serviceId, hostname, port) + +} \ No newline at end of file diff --git a/akka-actor-migration/src/main/scala/akka/actor/OldScheduler.scala b/akka-actor-migration/src/main/scala/akka/actor/OldScheduler.scala new file mode 100644 index 0000000000..7b487bf5db --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/actor/OldScheduler.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor + +import java.util.concurrent.TimeUnit +import akka.util.Duration + +/** + * Migration replacement for `object akka.actor.Scheduler`. + */ +@deprecated("use ActorSystem.scheduler instead", "2.0") +object OldScheduler { + + /** + * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay + */ + @deprecated("use ActorSystem.scheduler instead", "2.0") + def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = + GlobalActorSystem.scheduler.schedule( + Duration(initialDelay, timeUnit), + Duration(delay, timeUnit), + receiver, + message) + + /** + * Schedules to run specified function to the receiver after initialDelay and then repeated after delay + */ + @deprecated("use ActorSystem.scheduler instead", "2.0") + def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = + GlobalActorSystem.scheduler.schedule( + Duration(initialDelay, timeUnit), + Duration(delay, timeUnit), + new Runnable { def run = f() }) + + /** + * Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay. + */ + @deprecated("use ActorSystem.scheduler instead", "2.0") + def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = + GlobalActorSystem.scheduler.schedule( + Duration(initialDelay, timeUnit), + Duration(delay, timeUnit), + runnable) + + /** + * Schedules to send the specified message to the receiver after delay + */ + @deprecated("use ActorSystem.scheduler instead", "2.0") + def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = + GlobalActorSystem.scheduler.scheduleOnce( + Duration(delay, timeUnit), + receiver, + message) + + /** + * Schedules a function to be run after delay. + */ + @deprecated("use ActorSystem.scheduler instead", "2.0") + def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable = + GlobalActorSystem.scheduler.scheduleOnce( + Duration(delay, timeUnit), + new Runnable { def run = f() }) + + /** + * Schedules a runnable to be run after delay, + */ + @deprecated("use ActorSystem.scheduler instead", "2.0") + def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = + GlobalActorSystem.scheduler.scheduleOnce( + Duration(delay, timeUnit), + runnable) + +} + diff --git a/akka-actor-migration/src/main/scala/akka/config/OldConfig.scala b/akka-actor-migration/src/main/scala/akka/config/OldConfig.scala new file mode 100644 index 0000000000..69d7e15d41 --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/config/OldConfig.scala @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.config +import akka.actor.GlobalActorSystem +import com.typesafe.config.Config + +/** + * Migration replacement for `object akka.config.Config`. + */ +@deprecated("use ActorSystem.settings.config instead", "2.0") +object OldConfig { + + val config = new OldConfiguration(GlobalActorSystem.settings.config) + +} + +/** + * Migration adapter for `akka.config.Configuration` + */ +@deprecated("use ActorSystem.settings.config (com.typesafe.config.Config) instead", "2.0") +class OldConfiguration(config: Config) { + + import scala.collection.JavaConverters._ + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def contains(key: String): Boolean = config.hasPath(key) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def keys: Iterable[String] = config.root.keySet.asScala + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getAny(key: String): Option[Any] = { + try { + Option(config.getAnyRef(key)) + } catch { + case _ ⇒ None + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getAny(key: String, defaultValue: Any): Any = getAny(key).getOrElse(defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getSeqAny(key: String): Seq[Any] = { + try { + config.getAnyRefList(key).asScala + } catch { + case _ ⇒ Seq.empty[Any] + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getString(key: String): Option[String] = + try { + Option(config.getString(key)) + } catch { + case _ ⇒ None + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getString(key: String, defaultValue: String): String = getString(key).getOrElse(defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getList(key: String): Seq[String] = { + try { + config.getStringList(key).asScala + } catch { + case _ ⇒ Seq.empty[String] + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getInt(key: String): Option[Int] = { + try { + Option(config.getInt(key)) + } catch { + case _ ⇒ None + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getInt(key: String, defaultValue: Int): Int = getInt(key).getOrElse(defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getLong(key: String): Option[Long] = { + try { + Option(config.getLong(key)) + } catch { + case _ ⇒ None + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getLong(key: String, defaultValue: Long): Long = getLong(key).getOrElse(defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getFloat(key: String): Option[Float] = { + try { + Option(config.getDouble(key).toFloat) + } catch { + case _ ⇒ None + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getFloat(key: String, defaultValue: Float): Float = getFloat(key).getOrElse(defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getDouble(key: String): Option[Double] = { + try { + Option(config.getDouble(key)) + } catch { + case _ ⇒ None + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getDouble(key: String, defaultValue: Double): Double = getDouble(key).getOrElse(defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getBoolean(key: String): Option[Boolean] = { + try { + Option(config.getBoolean(key)) + } catch { + case _ ⇒ None + } + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getBoolean(key: String, defaultValue: Boolean): Boolean = getBoolean(key).getOrElse(defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getBool(key: String): Option[Boolean] = getBoolean(key) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getBool(key: String, defaultValue: Boolean): Boolean = getBoolean(key, defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def apply(key: String): String = getString(key) match { + case None ⇒ throw new ConfigurationException("undefined config: " + key) + case Some(v) ⇒ v + } + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def apply(key: String, defaultValue: String) = getString(key, defaultValue) + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def apply(key: String, defaultValue: Int) = getInt(key, defaultValue) + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def apply(key: String, defaultValue: Long) = getLong(key, defaultValue) + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def apply(key: String, defaultValue: Boolean) = getBool(key, defaultValue) + + @deprecated("use new com.typesafe.config.Config API instead", "2.0") + def getSection(name: String): Option[OldConfiguration] = { + try { + Option(new OldConfiguration(config.getConfig(name))) + } catch { + case _ ⇒ None + } + } +} \ No newline at end of file diff --git a/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala b/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala new file mode 100644 index 0000000000..f53a3dd11b --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.dispatch + +import java.util.concurrent.TimeoutException +import akka.util.duration._ +import akka.AkkaException +import akka.util.BoxedType +import akka.util.Duration +import akka.actor.GlobalActorSystem + +/** + * Some old methods made available through implicit conversion in + * [[akka.migration]]. + */ +@deprecated("use new Future api instead", "2.0") +class OldFuture[T](future: Future[T]) { + + @deprecated("use akka.dispatch.Await.result instead", "2.0") + def get: T = try { + Await.result(future, GlobalActorSystem.settings.ActorTimeout.duration) + } catch { + case e: TimeoutException ⇒ throw new FutureTimeoutException(e.getMessage, e) + } + + @deprecated("use akka.dispatch.Await.ready instead", "2.0") + def await: Future[T] = await(GlobalActorSystem.settings.ActorTimeout.duration) + + @deprecated("use akka.dispatch.Await.ready instead", "2.0") + def await(atMost: Duration) = try { + Await.ready(future, atMost) + future + } catch { + case e: TimeoutException ⇒ throw new FutureTimeoutException(e.getMessage, e) + } + + @deprecated("use new Future api instead", "2.0") + def as[A](implicit m: Manifest[A]): Option[A] = { + try await catch { case _: FutureTimeoutException ⇒ } + future.value match { + case None ⇒ None + case Some(Left(ex)) ⇒ throw ex + case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) + } + } + + @deprecated("use new Future api instead", "2.0") + def asSilently[A](implicit m: Manifest[A]): Option[A] = { + try await catch { case _: FutureTimeoutException ⇒ } + future.value match { + case None ⇒ None + case Some(Left(ex)) ⇒ throw ex + case Some(Right(v)) ⇒ + try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) + catch { case _: ClassCastException ⇒ None } + } + } + +} + +@deprecated("Await throws java.util.concurrent.TimeoutException", "2.0") +class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { + def this(message: String) = this(message, null) +} \ No newline at end of file diff --git a/akka-actor-migration/src/main/scala/akka/event/OldEventHandler.scala b/akka-actor-migration/src/main/scala/akka/event/OldEventHandler.scala new file mode 100644 index 0000000000..ef5846bc5c --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/event/OldEventHandler.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.event + +import akka.actor.GlobalActorSystem + +/** + * Migration replacement for `akka.event.EventHandler` + */ +@deprecated("use Logging instead", "2.0") +object OldEventHandler { + + @deprecated("use Logging instead", "2.0") + def error(cause: Throwable, instance: AnyRef, message: ⇒ String) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isErrorEnabled) log.error(cause, message) + } + + @deprecated("use Logging instead", "2.0") + def error(cause: Throwable, instance: AnyRef, message: Any) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isErrorEnabled) log.error(cause, message.toString) + } + + @deprecated("use Logging instead", "2.0") + def error(instance: AnyRef, message: ⇒ String) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isErrorEnabled) log.error(message.toString) + } + + @deprecated("use Logging instead", "2.0") + def error(instance: AnyRef, message: Any) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isErrorEnabled) log.error(message.toString) + } + + @deprecated("use Logging instead", "2.0") + def warning(instance: AnyRef, message: ⇒ String) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isWarningEnabled) log.warning(message) + } + + @deprecated("use Logging instead", "2.0") + def warning(instance: AnyRef, message: Any) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isWarningEnabled) log.warning(message.toString) + } + + @deprecated("use Logging instead", "2.0") + def info(instance: AnyRef, message: ⇒ String) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isInfoEnabled) log.info(message) + } + + @deprecated("use Logging instead", "2.0") + def info(instance: AnyRef, message: Any) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isInfoEnabled) log.info(message.toString) + } + + @deprecated("use Logging instead", "2.0") + def debug(instance: AnyRef, message: ⇒ String) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isDebugEnabled) log.debug(message) + } + + @deprecated("use Logging instead", "2.0") + def debug(instance: AnyRef, message: Any) { + val log = Logging.getLogger(GlobalActorSystem, instance) + if (log.isDebugEnabled) log.debug(message.toString) + } + + @deprecated("use Logging instead", "2.0") + def isInfoEnabled = Logging.getLogger(GlobalActorSystem, this).isInfoEnabled + + @deprecated("use Logging instead", "2.0") + def isDebugEnabled = Logging.getLogger(GlobalActorSystem, this).isDebugEnabled + +} diff --git a/akka-actor-migration/src/main/scala/akka/migration/package.scala b/akka-actor-migration/src/main/scala/akka/migration/package.scala new file mode 100644 index 0000000000..319fdd997e --- /dev/null +++ b/akka-actor-migration/src/main/scala/akka/migration/package.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka + +import akka.dispatch.Future +import akka.dispatch.OldFuture +import akka.util.Timeout +import akka.actor.GlobalActorSystem +import akka.dispatch.MessageDispatcher +import akka.actor.ActorRef + +package object migration { + + implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = new OldFuture[T](future) + + implicit def askTimeout: Timeout = GlobalActorSystem.settings.ActorTimeout + + implicit def defaultDispatcher: MessageDispatcher = GlobalActorSystem.dispatcher + + implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef) + + class OldActorRef(actorRef: ActorRef) { + @deprecated("Actors are automatically started when creatd, i.e. remove old call to start()", "2.0") + def start(): ActorRef = actorRef + + @deprecated("Stop with ActorSystem or ActorContext instead", "2.0") + def exit() = stop() + + @deprecated("Stop with ActorSystem or ActorContext instead", "2.0") + def stop(): Unit = GlobalActorSystem.stop(actorRef) + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 30828c1014..cd6dc58129 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -33,19 +33,26 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } "notify with one Terminated message when an Actor is stopped" in { - val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) - startWatching(terminal) - - testActor ! "ping" - expectMsg("ping") + val terminal = system.actorOf(Props.empty) + startWatching(terminal) ! "hallo" + expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill terminal ! PoisonPill expectTerminationOf(terminal) } + "notify with one Terminated message when an Actor is already dead" in { + val terminal = system.actorOf(Props.empty) + + terminal ! PoisonPill + + startWatching(terminal) + expectTerminationOf(terminal) + } + "notify with all monitors with one Terminated message when an Actor is stopped" in { - val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) + val terminal = system.actorOf(Props.empty) val monitor1, monitor2, monitor3 = startWatching(terminal) terminal ! PoisonPill @@ -60,7 +67,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { - val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) + val terminal = system.actorOf(Props.empty) val monitor1, monitor3 = startWatching(terminal) val monitor2 = system.actorOf(Props(new Actor { context.watch(terminal) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index a6d6a7df98..67c94d3dd7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -171,7 +171,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im system.eventStream.subscribe(testActor, classOf[Logging.Error]) fsm ! "go" expectMsgPF(1 second, hint = "Next state 2 does not exist") { - case Logging.Error(_, `name`, "Next state 2 does not exist") ⇒ true + case Logging.Error(_, `name`, _, "Next state 2 does not exist") ⇒ true } system.eventStream.unsubscribe(testActor) } @@ -218,18 +218,19 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } }) val name = fsm.path.toString + val fsmClass = fsm.underlyingActor.getClass system.eventStream.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true + case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true } - expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown")) - expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2")) + expectMsg(1 second, Logging.Debug(name, fsmClass, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, Logging.Debug(name, fsmClass, "transition 1 -> 2")) fsm ! "stop" expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true + case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true } - expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal) + expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), Normal) expectNoMsg(1 second) system.eventStream.unsubscribe(testActor) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 9debbd053c..fb75ab5593 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -151,7 +151,7 @@ object ActorModelSpec { await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get + + system.eventStream.publish(Error(e, dispatcher.toString, dispatcher.getClass, "actual: stops=" + dispatcher.stops.get + " required: stops=" + stops)) throw e } @@ -208,9 +208,12 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - system.eventStream.publish(Error(e, Option(dispatcher).toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + - ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + - ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) + system.eventStream.publish(Error(e, + Option(dispatcher).toString, + (Option(dispatcher) getOrElse this).getClass, + "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) throw e } } @@ -311,7 +314,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa try { f } catch { - case e ⇒ system.eventStream.publish(Error(e, "spawn", "error in spawned thread")) + case e ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index f6e5b92201..9a41c80f6d 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -108,7 +108,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { import Logging._ - val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error")) + val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error")) val msg = allmsg filter (_.level <= level) allmsg foreach bus.publish msg foreach (x ⇒ expectMsg(x)) diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 15f8646d4b..bcfb9c391b 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -59,7 +59,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd } val log = LoggingReceive("funky")(r) log.isDefinedAt("hallo") - expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo")) + expectMsg(1 second, Logging.Debug("funky", classOf[DummyClassForStringSources], "received unhandled message hallo")) } } @@ -83,7 +83,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val name = actor.path.toString actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(name, "received handled message buh")) + expectMsg(Logging.Debug(name, actor.underlyingActor.getClass, "received handled message buh")) expectMsg("x") } @@ -109,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd }) actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(actor.path.toString, "received handled message buh")) + expectMsg(Logging.Debug(actor.path.toString, actor.underlyingActor.getClass, "received handled message buh")) expectMsg("x") } } @@ -130,7 +130,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val name = actor.path.toString actor ! PoisonPill expectMsgPF() { - case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true + case Logging.Debug(`name`, _, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true } awaitCond(actor.isTerminated, 100 millis) } @@ -142,7 +142,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val sys = impl.systemGuardian.path.toString ignoreMute(this) ignoreMsg { - case Logging.Debug(s, _) ⇒ s.contains("MainBusReaper") || s == sys + case Logging.Debug(`sys`, _, _) ⇒ true } system.eventStream.subscribe(testActor, classOf[Logging.Debug]) system.eventStream.subscribe(testActor, classOf[Logging.Error]) @@ -151,51 +151,53 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val lname = lifecycleGuardian.path.toString val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) val sname = supervisor.path.toString + val sclass = classOf[TestLogActor] val supervisorSet = receiveWhile(messages = 2) { - case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`sname`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`lname`, _, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`sname`, `sclass`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)") val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none") val aname = actor.path.toString + val aclass = classOf[TestLogActor] val set = receiveWhile(messages = 2) { - case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`aname`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`sname`, _, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`aname`, `aclass`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2), set + " was not Set(1, 2)") supervisor watch actor expectMsgPF(hint = "now monitoring") { - case Logging.Debug(ref, msg: String) ⇒ + case Logging.Debug(ref, `sclass`, msg: String) ⇒ ref == supervisor.underlyingActor && msg.startsWith("now monitoring") } supervisor unwatch actor expectMsgPF(hint = "stopped monitoring") { - case Logging.Debug(ref, msg: String) ⇒ + case Logging.Debug(ref, `sclass`, msg: String) ⇒ ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring") } EventFilter[ActorKilledException](occurrences = 1) intercept { actor ! Kill val set = receiveWhile(messages = 3) { - case Logging.Error(_: ActorKilledException, `aname`, "Kill") ⇒ 1 - case Logging.Debug(`aname`, "restarting") ⇒ 2 - case Logging.Debug(`aname`, "restarted") ⇒ 3 + case Logging.Error(_: ActorKilledException, `aname`, `aclass`, "Kill") ⇒ 1 + case Logging.Debug(`aname`, `aclass`, "restarting") ⇒ 2 + case Logging.Debug(`aname`, `aclass`, "restarted") ⇒ 3 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") } system.stop(supervisor) - expectMsg(Logging.Debug(sname, "stopping")) - expectMsg(Logging.Debug(aname, "stopped")) - expectMsg(Logging.Debug(sname, "stopped")) + expectMsg(Logging.Debug(sname, `sclass`, "stopping")) + expectMsg(Logging.Debug(aname, `aclass`, "stopped")) + expectMsg(Logging.Debug(sname, `sclass`, "stopped")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index d87d688231..35cc429fa6 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -135,15 +135,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer)))) val latch1 = new TestLatch(1) - router.!((latch1, busy)) + router ! (latch1, busy) Await.ready(latch1, 2 seconds) val latch2 = new TestLatch(1) - router.!((latch2, busy)) + router ! (latch2, busy) Await.ready(latch2, 2 seconds) val latch3 = new TestLatch(1) - router.!((latch3, busy)) + router ! (latch3, busy) Await.ready(latch3, 2 seconds) Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) @@ -178,7 +178,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - count.set(0) + (10 millis).dilated.sleep for (m ← 0 until loops) { router.!((t, latch, count)) (10 millis).dilated.sleep diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 12fada0880..a9ec39ff6e 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -12,6 +12,8 @@ import akka.dispatch.Await import akka.util.Duration import akka.config.ConfigurationException import com.typesafe.config.ConfigFactory +import java.util.concurrent.ConcurrentHashMap +import com.typesafe.config.Config object RoutingSpec { @@ -21,6 +23,10 @@ object RoutingSpec { router = round-robin nr-of-instances = 3 } + /myrouter { + router = "akka.routing.RoutingSpec$MyRouter" + foo = bar + } } """ @@ -37,6 +43,18 @@ object RoutingSpec { } } + class MyRouter(config: Config) extends RouterConfig { + val foo = config.getString("foo") + def createRoute(routeeProps: Props, actorContext: ActorContext): Route = { + val routees = IndexedSeq(actorContext.actorOf(Props[Echo])) + registerRoutees(actorContext, routees) + + { + case (sender, message) ⇒ Nil + } + } + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -256,6 +274,61 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } } + "smallest mailbox router" must { + "be started when constructed" in { + val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) + } + + "deliver messages to idle actor" in { + val usedActors = new ConcurrentHashMap[Int, String]() + val router = system.actorOf(Props(new Actor { + def receive = { + case (busy: TestLatch, receivedLatch: TestLatch) ⇒ + usedActors.put(0, self.path.toString) + self ! "another in busy mailbox" + receivedLatch.countDown() + Await.ready(busy, TestLatch.DefaultTimeout) + case (msg: Int, receivedLatch: TestLatch) ⇒ + usedActors.put(msg, self.path.toString) + receivedLatch.countDown() + case s: String ⇒ + } + }).withRouter(SmallestMailboxRouter(3))) + + val busy = TestLatch(1) + val received0 = TestLatch(1) + router ! (busy, received0) + Await.ready(received0, TestLatch.DefaultTimeout) + + val received1 = TestLatch(1) + router ! (1, received1) + Await.ready(received1, TestLatch.DefaultTimeout) + + val received2 = TestLatch(1) + router ! (2, received2) + Await.ready(received2, TestLatch.DefaultTimeout) + + val received3 = TestLatch(1) + router ! (3, received3) + Await.ready(received3, TestLatch.DefaultTimeout) + + busy.countDown() + + val busyPath = usedActors.get(0) + busyPath must not be (null) + + val path1 = usedActors.get(1) + val path2 = usedActors.get(2) + val path3 = usedActors.get(3) + + path1 must not be (busyPath) + path2 must not be (busyPath) + path3 must not be (busyPath) + + } + } + "broadcast router" must { "be started when constructed" in { val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1))) @@ -409,6 +482,10 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with sys.shutdown() } } + "support custom router" in { + val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter") + myrouter.isTerminated must be(false) + } } "custom router" must { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 882fccea55..02d1a49035 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -64,8 +64,10 @@ akka { default { # routing (load-balance) scheme to use - # available: "from-code", "round-robin", "random", "scatter-gather", "broadcast" - # or: fully qualified class name of the router class + # available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast" + # or: Fully qualified class name of the router class. + # The router class must extend akka.routing.CustomRouterConfig and and have constructor + # with com.typesafe.config.Config parameter. # default is "from-code"; # Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter). # The type of router can be overridden in the configuration; specifying "from-code" means diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8713df95b4..4681e88cfa 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -112,7 +112,7 @@ object Status { } trait ActorLogging { this: Actor ⇒ - val log = akka.event.Logging(context.system.eventStream, context.self) + val log = akka.event.Logging(context.system, context.self) } object Actor { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index eaad8d0610..5454d54d23 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -358,12 +358,12 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "started (" + actor + ")")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ try { - system.eventStream.publish(Error(e, self.path.toString, "error while creating actor")) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -373,7 +373,7 @@ private[akka] class ActorCell( def recreate(cause: Throwable): Unit = try { val failedActor = actor - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarting")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting")) val freshActor = newActor() if (failedActor ne null) { val c = currentMessage //One read only plz @@ -388,7 +388,7 @@ private[akka] class ActorCell( actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call hotswap = Props.noHotSwap // Reset the behavior freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarted")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) dispatcher.resume(this) //FIXME should this be moved down? @@ -396,7 +396,7 @@ private[akka] class ActorCell( } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ try { - system.eventStream.publish(Error(e, self.path.toString, "error while creating actor")) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -417,7 +417,7 @@ private[akka] class ActorCell( else { // do not process normal messages while waiting for all children to terminate dispatcher suspend this - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them for (child ← c) child.asInstanceOf[InternalActorRef].stop() stopping = true @@ -428,12 +428,12 @@ private[akka] class ActorCell( childrenRefs.get(child.path.name) match { case None ⇒ childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case Some(ChildRestartStats(`child`, _, _)) ⇒ // this is the nominal case where we created the child and entered it in actorCreated() above - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case Some(ChildRestartStats(c, _, _)) ⇒ - system.eventStream.publish(Warning(self.path.toString, "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child)) + system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child)) } } @@ -448,10 +448,10 @@ private[akka] class ActorCell( case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ system.deathWatch.subscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now monitoring " + subject)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) case Unlink(subject) ⇒ system.deathWatch.unsubscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) case Suspend() ⇒ suspend() case Resume() ⇒ resume() case Terminate() ⇒ terminate() @@ -460,7 +460,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ //Should we really catch everything here? - system.eventStream.publish(Error(e, self.path.toString, "error while processing " + message)) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message)) //TODO FIXME How should problems here be handled??? throw e } @@ -480,7 +480,7 @@ private[akka] class ActorCell( currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - system.eventStream.publish(Error(e, self.path.toString, e.getMessage)) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -500,7 +500,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ - system.eventStream.publish(Error(e, self.path.toString, e.getMessage)) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) throw e } } @@ -530,7 +530,8 @@ private[akka] class ActorCell( } def autoReceiveMessage(msg: Envelope) { - if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg)) + if (system.settings.DebugAutoReceive) + system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { case Failed(cause) ⇒ handleFailure(sender, cause) @@ -554,7 +555,8 @@ private[akka] class ActorCell( try { parent.sendSystemMessage(ChildTerminated(self)) system.deathWatch.publish(Terminated(self)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped")) + if (system.settings.DebugLifecycle) + system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) // FIXME: can actor be null? } finally { currentMessage = null clearActorFields() @@ -565,8 +567,8 @@ private[akka] class ActorCell( final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause - case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) - case None ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child)) + case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) + case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } final def handleChildTerminated(child: ActorRef): Unit = { @@ -625,4 +627,9 @@ private[akka] class ActorCell( lookupAndSetField(a.getClass, a, "self", self) } } + + private def clazz(o: AnyRef): Class[_] = { + if (o eq null) this.getClass + else o.getClass + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 254b19e010..94ec966468 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -449,7 +449,10 @@ object DeadLetterActorRef { val serialized = new SerializedDeadLetterActorRef } -class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { +trait DeadLetterActorRefLike extends MinimalActorRef { + + def eventStream: EventStream + @volatile private var brokenPromise: Future[Any] = _ @volatile @@ -477,7 +480,9 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { assert(brokenPromise != null) brokenPromise } +} +class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike { @throws(classOf[java.io.ObjectStreamException]) override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized } @@ -486,8 +491,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { * This special dead letter reference has a name: it is that which is returned * by a local look-up which is unsuccessful. */ -class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) - extends DeadLetterActorRef(_eventStream) { +class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) + extends DeadLetterActorRefLike { init(_dispatcher, _path) override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case d: DeadLetter ⇒ // do NOT form endless loops diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index a4b3db0686..ad1c7de587 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -296,7 +296,7 @@ class LocalActorRefProvider( val nodename: String = "local" val clustername: String = "local" - val log = Logging(eventStream, "LocalActorRefProvider") + val log = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")") /* * generate name for temporary actor refs diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index aafbe1f0e3..e24a3a29f2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -330,7 +330,11 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor // this provides basic logging (to stdout) until .start() is called below val eventStream = new EventStream(DebugEventStream) eventStream.startStdoutLogger(settings) - val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages + + // unfortunately we need logging before we know the rootpath address, which wants to be inserted here + @volatile + private var _log = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass) + def log = _log val scheduler = createScheduler() @@ -383,6 +387,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor private lazy val _start: this.type = { // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) + _log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass) deadLetters.init(dispatcher, lookupRoot.path / "deadLetters") // this starts the reaper actor and the user-configured logging subscribers, which are also actors registerOnTermination(stopScheduler()) @@ -498,4 +503,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } } + + override def toString = lookupRoot.path.root.address.toString } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 35ec05432a..5ac4c13391 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -13,6 +13,7 @@ import akka.event.EventStream import com.typesafe.config._ import akka.routing._ import java.util.concurrent.{ TimeUnit, ConcurrentHashMap } +import akka.util.ReflectiveAccess case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) @@ -56,27 +57,29 @@ class Deployer(val settings: ActorSystem.Settings) { val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) val resizer: Option[Resizer] = if (config.hasPath("resizer")) { - val resizerConfig = deployment.getConfig("resizer") - Some(DefaultResizer( - lowerBound = resizerConfig.getInt("lower-bound"), - upperBound = resizerConfig.getInt("upper-bound"), - pressureThreshold = resizerConfig.getInt("pressure-threshold"), - rampupRate = resizerConfig.getDouble("rampup-rate"), - backoffThreshold = resizerConfig.getDouble("backoff-threshold"), - backoffRate = resizerConfig.getDouble("backoff-rate"), - stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS), - messagesPerResize = resizerConfig.getInt("messages-per-resize"))) + Some(DefaultResizer(deployment.getConfig("resizer"))) } else { None } val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case fqn ⇒ + val constructorSignature = Array[Class[_]](classOf[Config]) + ReflectiveAccess.createInstance[RouterConfig](fqn, constructorSignature, Array[AnyRef](deployment)) match { + case Right(router) ⇒ router + case Left(exception) ⇒ + throw new IllegalArgumentException( + ("Cannot instantiate router [%s], defined in [%s], " + + "make sure it extends [akka.routing.RouterConfig] and has constructor with " + + "[com.typesafe.config.Config] parameter") + .format(fqn, key), exception) + } } val recipe: Option[ActorRecipe] = diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index b49be8c0b5..eb573df767 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -190,7 +190,7 @@ trait FSM[S, D] extends Listeners { type Timeout = Option[Duration] type TransitionHandler = PartialFunction[(S, S), Unit] - val log = Logging(context.system, context.self) + val log = Logging(context.system, this) /** * **************************************** diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 52f35fd952..d9b45ea7c8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -80,7 +80,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl runnable.run() } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) + case e ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) } finally { cleanup() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 79331e0397..2511dbc8e2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -59,7 +59,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e2: RejectedExecutionException ⇒ - prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString)) + prerequisites.eventStream.publish(Warning("Dispatcher", this.getClass, e2.toString)) throw e2 } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 491d79a63b..942bd25a65 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -77,7 +77,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } else { // Note that the configurator of the default dispatcher will be registered for this id, // so this will only be logged once, which is crucial. - prerequisites.eventStream.publish(Warning("Dispatchers", + prerequisites.eventStream.publish(Warning("Dispatchers", this.getClass, "Dispatcher [%s] not configured, using default-dispatcher".format(id))) lookupConfigurator(DefaultDispatcherId) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 933a263732..0675f1c9f2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -325,7 +325,7 @@ object Future { // FIXME catching all and continue isn't good for OOME, ticket #1418 executor match { case m: MessageDispatcher ⇒ - m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage)) + m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", this.getClass, e.getMessage)) case other ⇒ e.printStackTrace() } @@ -566,7 +566,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { protected def logError(msg: String, problem: Throwable): Unit = { executor match { - case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, this.getClass, problem.getMessage)) case other ⇒ problem.printStackTrace() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 3abd961d0f..0da0bf13af 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -214,7 +214,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue } } catch { case e ⇒ - actor.system.eventStream.publish(Error(e, actor.self.path.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) + actor.system.eventStream.publish(Error(e, actor.self.path.toString, actor.actor.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } } diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index eea9deff35..6ad2d0fbdf 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -38,19 +38,19 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su } override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { - if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) super.subscribe(subscriber, channel) } override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { val ret = super.unsubscribe(subscriber, channel) - if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel)) ret } override def unsubscribe(subscriber: ActorRef) { super.unsubscribe(subscriber) - if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels")) + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels")) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index bfd0f2a184..07a1da1da5 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -16,10 +16,6 @@ import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException import akka.dispatch.Await -object LoggingBus { - implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream -} - /** * This trait brings log level handling to the EventStream: it reads the log * levels for the initial logging (StandardOutLogger) and the loggers & level @@ -75,7 +71,7 @@ trait LoggingBus extends ActorEventBus { */ private[akka] def startStdoutLogger(config: Settings) { val level = levelFor(config.StdoutLogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), this.getClass, "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) ErrorLevel } AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) @@ -83,15 +79,16 @@ trait LoggingBus extends ActorEventBus { loggers = Seq(StandardOutLogger) _logLevel = level } - publish(Debug(simpleName(this), "StandardOutLogger started")) + publish(Debug(simpleName(this), this.getClass, "StandardOutLogger started")) } /** * Internal Akka use only */ private[akka] def startDefaultLoggers(system: ActorSystemImpl) { + val logName = simpleName(this) + "(" + system + ")" val level = levelFor(system.settings.LogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, logName, this.getClass, "unknown akka.stdout-loglevel " + system.settings.LogLevel)) ErrorLevel } try { @@ -105,7 +102,7 @@ trait LoggingBus extends ActorEventBus { } yield { try { ReflectiveAccess.getClassFor[Actor](loggerName) match { - case Right(actorClass) ⇒ addLogger(system, actorClass, level) + case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) case Left(exception) ⇒ throw exception } } catch { @@ -119,7 +116,7 @@ trait LoggingBus extends ActorEventBus { loggers = myloggers _logLevel = level } - publish(Debug(simpleName(this), "Default Loggers started")) + publish(Debug(logName, this.getClass, "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) } @@ -138,7 +135,7 @@ trait LoggingBus extends ActorEventBus { val level = _logLevel // volatile access before reading loggers if (!(loggers contains StandardOutLogger)) { AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) - publish(Debug(simpleName(this), "shutting down: StandardOutLogger started")) + publish(Debug(simpleName(this), this.getClass, "shutting down: StandardOutLogger started")) } for { logger ← loggers @@ -151,33 +148,105 @@ trait LoggingBus extends ActorEventBus { case _ ⇒ } } - publish(Debug(simpleName(this), "all default loggers stopped")) + publish(Debug(simpleName(this), this.getClass, "all default loggers stopped")) } - private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { + private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) implicit val timeout = Timeout(3 seconds) val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ - publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) + publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) } if (response != LoggerInitialized) throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response) AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) - publish(Debug(simpleName(this), "logger " + name + " started")) + publish(Debug(logName, this.getClass, "logger " + name + " started")) actor } } +/** + * This trait defines the interface to be provided by a “log source formatting + * rule” as used by [[akka.event.Logging]]’s `apply`/`create` method. + * + * See the companion object for default implementations. + * + * Example: + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * } + * + * class MyClass extends MyType { + * val log = Logging(eventStream, this) // will use "hallo" as logSource + * def name = "hallo" + * } + * }}} + * + * The second variant is used for including the actor system’s address: + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * def genString(a: MyType, s: ActorSystem) = a.name + "," + s + * } + * + * class MyClass extends MyType { + * val sys = ActorSyste("sys") + * val log = Logging(sys, this) // will use "hallo,akka://sys" as logSource + * def name = "hallo" + * } + * }}} + * + * The default implementation of the second variant will just call the first. + */ trait LogSource[-T] { def genString(t: T): String + def genString(t: T, system: ActorSystem): String = genString(t) + def getClazz(t: T): Class[_] = t.getClass } +/** + * This is a “marker” class which is inserted as originator class into + * [[akka.event.LogEvent]] when the string representation was supplied + * directly. + */ +class DummyClassForStringSources + +/** + * This object holds predefined formatting rules for log sources. + * + * In case an [[akka.actor.ActorSystem]] is provided, the following apply: + *

    + *
  • [[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path
  • + *
  • providing a `String` as source will append "()" and use the result
  • + *
  • providing a `Class` will extract its simple name, append "()" and use the result
  • + *
  • anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it
  • + *
+ * + * In case a [[akka.event.LoggingBus]] is provided, the following apply: + *
    + *
  • [[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path
  • + *
  • providing a `String` as source will be used as is
  • + *
  • providing a `Class` will extract its simple name
  • + *
  • anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it
  • + *
+ */ object LogSource { implicit val fromString: LogSource[String] = new LogSource[String] { def genString(s: String) = s + override def genString(s: String, system: ActorSystem) = s + "(" + system + ")" + override def getClazz(s: String) = classOf[DummyClassForStringSources] } implicit val fromActor: LogSource[Actor] = new LogSource[Actor] { @@ -191,18 +260,55 @@ object LogSource { // this one unfortunately does not work as implicit, because existential types have some weird behavior val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] { def genString(c: Class[_]) = simpleName(c) + override def genString(c: Class[_], system: ActorSystem) = simpleName(c) + "(" + system + ")" + override def getClazz(c: Class[_]) = c } implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]] - def apply[T: LogSource](o: T) = implicitly[LogSource[T]].genString(o) + /** + * Convenience converter access: given an implicit `LogSource`, generate the + * string representation and originating class. + */ + def apply[T: LogSource](o: T): (String, Class[_]) = { + val ls = implicitly[LogSource[T]] + (ls.genString(o), ls.getClazz(o)) + } - def fromAnyRef(o: AnyRef): String = + /** + * Convenience converter access: given an implicit `LogSource` and + * [[akka.actor.ActorSystem]], generate the string representation and + * originating class. + */ + def apply[T: LogSource](o: T, system: ActorSystem): (String, Class[_]) = { + val ls = implicitly[LogSource[T]] + (ls.genString(o, system), ls.getClazz(o)) + } + + /** + * construct string representation for any object according to + * rules above with fallback to its `Class`’s simple name. + */ + def fromAnyRef(o: AnyRef): (String, Class[_]) = o match { - case c: Class[_] ⇒ fromClass.genString(c) - case a: Actor ⇒ fromActor.genString(a) - case a: ActorRef ⇒ fromActorRef.genString(a) - case s: String ⇒ s - case x ⇒ simpleName(x) + case c: Class[_] ⇒ apply(c) + case a: Actor ⇒ apply(a) + case a: ActorRef ⇒ apply(a) + case s: String ⇒ apply(s) + case x ⇒ (simpleName(x), x.getClass) + } + + /** + * construct string representation for any object according to + * rules above (including the actor system’s address) with fallback to its + * `Class`’s simple name. + */ + def fromAnyRef(o: AnyRef, system: ActorSystem): (String, Class[_]) = + o match { + case c: Class[_] ⇒ apply(c) + case a: Actor ⇒ apply(a) + case a: ActorRef ⇒ apply(a) + case s: String ⇒ apply(s) + case x ⇒ (simpleName(x) + "(" + system + ")", x.getClass) } } @@ -218,6 +324,11 @@ object LogSource { * log.info("hello world!") * * + * The source object is used in two fashions: its `Class[_]` will be part of + * all log events produced by this logger, plus a string representation is + * generated which may contain per-instance information, see `apply` or `create` + * below. + * * Loggers are attached to the level-specific channels Error, * Warning, Info and Debug as * appropriate for the configured (or set) log level. If you want to implement @@ -305,42 +416,80 @@ object Logging { val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern /** - * Obtain LoggingAdapter for the given event stream (system) and source object. - * Note that there is an implicit conversion from [[akka.actor.ActorSystem]] - * to [[akka.event.LoggingBus]]. + * Obtain LoggingAdapter for the given actor system and source object. This + * will use the system’s event stream and include the system’s address in the + * log source string. * - * The source is used to identify the source of this logging channel and must have - * a corresponding LogSource[T] instance in scope; by default these are - * provided for Class[_], Actor, ActorRef and String types. The source - * object is translated to a String according to the following rules: - *
    - *
  • if it is an Actor or ActorRef, its path is used
  • - *
  • in case of a String it is used as is
  • - *
  • in case of a class an approximation of its simpleName - *
  • and in all other cases the simpleName of its class
  • - *
+ * Do not use this if you want to supply a log category string (like + * “com.example.app.whatever”) unaltered, supply `system.eventStream` in this + * case or use + * + * {{{ + * Logging(system, this.getClass) + * }}} + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. + * + * You can add your own rules quite easily, see [[akka.event.LogSource]]. */ - def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter = - new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource)) + def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = { + val (str, clazz) = LogSource(logSource, system) + new BusLogging(system.eventStream, str, clazz) + } /** - * Java API: Obtain LoggingAdapter for the given system and source object. The - * source object is used to identify the source of this logging channel. The source - * object is translated to a String according to the following rules: - *
    - *
  • if it is an Actor or ActorRef, its path is used
  • - *
  • in case of a String it is used as is
  • - *
  • in case of a class an approximation of its simpleName - *
  • and in all other cases the simpleName of its class
  • - *
+ * Obtain LoggingAdapter for the given logging bus and source object. + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. + * + * You can add your own rules quite easily, see [[akka.event.LogSource]]. */ - def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource)) + def apply[T: LogSource](bus: LoggingBus, logSource: T): LoggingAdapter = { + val (str, clazz) = LogSource(logSource) + new BusLogging(bus, str, clazz) + } /** - * Java API: Obtain LoggingAdapter for the given event bus and source object. The - * source object is used to identify the source of this logging channel. + * Obtain LoggingAdapter for the given actor system and source object. This + * will use the system’s event stream and include the system’s address in the + * log source string. + * + * Do not use this if you want to supply a log category string (like + * “com.example.app.whatever”) unaltered, supply `system.eventStream` in this + * case or use + * + * {{{ + * Logging.getLogger(system, this.getClass()); + * }}} + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. */ - def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) + def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = { + val (str, clazz) = LogSource.fromAnyRef(logSource, system) + new BusLogging(system.eventStream, str, clazz) + } + + /** + * Obtain LoggingAdapter for the given logging bus and source object. + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. + */ + def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = { + val (str, clazz) = LogSource.fromAnyRef(logSource) + new BusLogging(bus, str, clazz) + } /** * Artificial exception injected into Error events if no Throwable is @@ -362,19 +511,34 @@ object Logging { * The LogLevel of this LogEvent */ def level: LogLevel + + /** + * The source of this event + */ + def logSource: String + + /** + * The class of the source of this event + */ + def logClass: Class[_] + + /** + * The message, may be any object or null. + */ + def message: Any } /** * For ERROR Logging */ - case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent { - def this(logSource: String, message: Any) = this(Error.NoCause, logSource, message) + case class Error(cause: Throwable, logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { + def this(logSource: String, logClass: Class[_], message: Any) = this(Error.NoCause, logSource, logClass, message) override def level = ErrorLevel } object Error { - def apply(logSource: String, message: Any) = new Error(NoCause, logSource, message) + def apply(logSource: String, logClass: Class[_], message: Any) = new Error(NoCause, logSource, logClass, message) /** Null Object used for errors without cause Throwable */ object NoCause extends NoStackTrace @@ -383,21 +547,21 @@ object Logging { /** * For WARNING Logging */ - case class Warning(logSource: String, message: Any = "") extends LogEvent { + case class Warning(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { override def level = WarningLevel } /** * For INFO Logging */ - case class Info(logSource: String, message: Any = "") extends LogEvent { + case class Info(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { override def level = InfoLevel } /** * For DEBUG Logging */ - case class Debug(logSource: String, message: Any = "") extends LogEvent { + case class Debug(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { override def level = DebugLevel } @@ -439,7 +603,7 @@ object Logging { case e: Warning ⇒ warning(e) case e: Info ⇒ info(e) case e: Debug ⇒ debug(e) - case e ⇒ warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e)) + case e ⇒ warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e)) } } @@ -626,7 +790,7 @@ trait LoggingAdapter { } } -class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter { +class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter { import Logging._ @@ -635,14 +799,14 @@ class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdap def isInfoEnabled = bus.logLevel >= InfoLevel def isDebugEnabled = bus.logLevel >= DebugLevel - protected def notifyError(message: String) { bus.publish(Error(logSource, message)) } + protected def notifyError(message: String) { bus.publish(Error(logSource, logClass, message)) } - protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) } + protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, logClass, message)) } - protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) } + protected def notifyWarning(message: String) { bus.publish(Warning(logSource, logClass, message)) } - protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) } + protected def notifyInfo(message: String) { bus.publish(Info(logSource, logClass, message)) } - protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) } + protected def notifyDebug(message: String) { bus.publish(Debug(logSource, logClass, message)) } } diff --git a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala index 250af89812..27d829de5e 100644 --- a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala +++ b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala @@ -36,7 +36,8 @@ object LoggingReceive { class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o)) + val (str, clazz) = LogSource.fromAnyRef(source) + system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) handled } def apply(o: Any): Unit = r(o) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 5e7c9ae701..0473f99fd6 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -7,8 +7,10 @@ import akka.actor._ import akka.dispatch.Future import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeUnit import akka.util.{ Duration, Timeout } import akka.util.duration._ +import com.typesafe.config.Config import akka.config.ConfigurationException import scala.collection.JavaConversions.iterableAsScalaIterable @@ -286,12 +288,15 @@ object RoundRobinRouter { * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the round robin should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RoundRobinLike { @@ -307,9 +312,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -361,12 +368,15 @@ object RandomRouter { * A Router that randomly selects one of the target connections to send a message to. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RandomLike { @@ -382,9 +392,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -424,6 +436,159 @@ trait RandomLike { this: RouterConfig ⇒ } } +object SmallestMailboxRouter { + def apply(routees: Iterable[ActorRef]) = new SmallestMailboxRouter(routees = routees map (_.path.toString)) + + /** + * Java API to create router with the supplied 'routees' actors. + */ + def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = { + import scala.collection.JavaConverters._ + apply(routees.asScala) + } +} +/** + * A Router that tries to send to the non-suspended routee with fewest messages in mailbox. + * The selection is done in this order: + *
    + *
  • pick any idle routee (not processing message) with empty mailbox
  • + *
  • pick any routee with empty mailbox
  • + *
  • pick routee with fewest pending messages in mailbox
  • + *
  • pick any remote routee, remote actors are consider lowest priority, + * since their mailbox size is unknown
  • + *
+ * + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ +case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) + extends RouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the routees to be used. + * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) + } + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(resizer = Some(resizer)) +} + +trait SmallestMailboxLike { this: RouterConfig ⇒ + + import java.security.SecureRandom + + def nrOfInstances: Int + + def routees: Iterable[String] + + private val random = new ThreadLocal[SecureRandom] { + override def initialValue = SecureRandom.getInstance("SHA1PRNG") + } + + /** + * Returns true if the actor is currently processing a message. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def isProcessingMessage(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ + val cell = x.underlying + cell.mailbox.isScheduled && cell.currentMessage != null + case _ ⇒ false + } + + /** + * Returns true if the actor currently has any pending messages + * in the mailbox, i.e. the mailbox is not empty. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def hasMessages(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages + case _ ⇒ false + } + + /** + * Returns true if the actor is currently suspended. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def isSuspended(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ + val cell = x.underlying + cell.mailbox.isSuspended + case _ ⇒ false + } + + /** + * Returns the number of pending messages in the mailbox of the actor. + * It will always return 0 for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def numberOfMessages(a: ActorRef): Int = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages + case _ ⇒ 0 + } + + def createRoute(props: Props, context: ActorContext): Route = { + val ref = context.self.asInstanceOf[RoutedActorRef] + createAndRegisterRoutees(props, context, nrOfInstances, routees) + + def getNext(): ActorRef = { + // non-local actors mailbox size is unknown, so consider them lowest priority + val activeLocal = ref.routees collect { case l: LocalActorRef if !isSuspended(l) ⇒ l } + // 1. anyone not processing message and with empty mailbox + activeLocal.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a)) getOrElse { + // 2. anyone with empty mailbox + activeLocal.find(a ⇒ !hasMessages(a)) getOrElse { + // 3. sort on mailbox size + activeLocal.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse { + // 4. no locals, just pick one, random + ref.routees(random.get.nextInt(ref.routees.size)) + } + } + } + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, ref.routees) + case msg ⇒ List(Destination(sender, getNext())) + } + } + } +} + object BroadcastRouter { def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString)) @@ -439,12 +604,15 @@ object BroadcastRouter { * A Router that uses broadcasts a message to all its connections. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with BroadcastLike { @@ -460,9 +628,11 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -507,12 +677,15 @@ object ScatterGatherFirstCompletedRouter { * Simple router that broadcasts the message to all routees, and replies with the first response. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, override val resizer: Option[Resizer] = None) @@ -529,9 +702,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String], w: Duration) = { - this(routees = iterableAsScalaIterable(t), within = w) + def this(routeePaths: java.lang.Iterable[String], w: Duration) = { + this(routees = iterableAsScalaIterable(routeePaths), within = w) } /** @@ -587,6 +762,19 @@ trait Resizer { def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) } +case object DefaultResizer { + def apply(resizerConfig: Config): DefaultResizer = + DefaultResizer( + lowerBound = resizerConfig.getInt("lower-bound"), + upperBound = resizerConfig.getInt("upper-bound"), + pressureThreshold = resizerConfig.getInt("pressure-threshold"), + rampupRate = resizerConfig.getDouble("rampup-rate"), + backoffThreshold = resizerConfig.getDouble("backoff-threshold"), + backoffRate = resizerConfig.getDouble("backoff-rate"), + stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS), + messagesPerResize = resizerConfig.getInt("messages-per-resize")) +} + case class DefaultResizer( /** * The fewest number of routees the router should ever have. diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala index bcfd5d2477..9a9f0530fb 100644 --- a/akka-actor/src/main/scala/akka/util/JMX.scala +++ b/akka-actor/src/main/scala/akka/util/JMX.scala @@ -21,7 +21,7 @@ object JMX { case e: InstanceAlreadyExistsException ⇒ Some(mbeanServer.getObjectInstance(name)) case e: Exception ⇒ - system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean))) + system.eventStream.publish(Error(e, "JMX", this.getClass, "Error when registering mbean [%s]".format(mbean))) None } @@ -29,6 +29,6 @@ object JMX { mbeanServer.unregisterMBean(mbean) } catch { case e: InstanceNotFoundException ⇒ {} - case e: Exception ⇒ system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean))) + case e: Exception ⇒ system.eventStream.publish(Error(e, "JMX", this.getClass, "Error while unregistering mbean [%s]".format(mbean))) } } diff --git a/akka-docs/general/jmm.rst b/akka-docs/general/jmm.rst index 23871449ef..7d806f9ac8 100644 --- a/akka-docs/general/jmm.rst +++ b/akka-docs/general/jmm.rst @@ -1,3 +1,5 @@ +.. _jmm: + Akka and the Java Memory Model ================================ diff --git a/akka-docs/intro/deployment-scenarios.rst b/akka-docs/intro/deployment-scenarios.rst index c76284d62c..f8d6f6b197 100644 --- a/akka-docs/intro/deployment-scenarios.rst +++ b/akka-docs/intro/deployment-scenarios.rst @@ -1,4 +1,3 @@ - .. _deployment-scenarios: ################################### @@ -28,7 +27,7 @@ Actors as services ^^^^^^^^^^^^^^^^^^ The simplest way you can use Akka is to use the actors as services in your Web -application. All that’s needed to do that is to put the Akka charts as well as +application. All that’s needed to do that is to put the Akka jars as well as its dependency jars into ``WEB-INF/lib``. You also need to put the :ref:`configuration` file in the ``$AKKA_HOME/config`` directory. Now you can create your Actors as regular services referenced from your Web application. You should also diff --git a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java index c8d8b019bb..2125ae35a8 100644 --- a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java @@ -7,6 +7,7 @@ import akka.routing.ScatterGatherFirstCompletedRouter; import akka.routing.BroadcastRouter; import akka.routing.RandomRouter; import akka.routing.RoundRobinRouter; +import akka.routing.SmallestMailboxRouter; import akka.actor.UntypedActor; import akka.actor.ActorRef; import akka.actor.Props; @@ -34,6 +35,14 @@ public class ParentActor extends UntypedActor { randomRouter.tell(i, getSelf()); } //#randomRouter + } else if (msg.equals("smr")) { + //#smallestMailboxRouter + ActorRef smallestMailboxRouter = getContext().actorOf( + new Props(PrintlnActor.class).withRouter(new SmallestMailboxRouter(5)), "router"); + for (int i = 1; i <= 10; i++) { + smallestMailboxRouter.tell(i, getSelf()); + } + //#smallestMailboxRouter } else if (msg.equals("br")) { //#broadcastRouter ActorRef broadcastRouter = getContext().actorOf(new Props(PrintlnActor.class).withRouter(new BroadcastRouter(5)), diff --git a/akka-docs/java/logging.rst b/akka-docs/java/logging.rst index 20920d940b..ffee92d00e 100644 --- a/akka-docs/java/logging.rst +++ b/akka-docs/java/logging.rst @@ -17,8 +17,13 @@ as illustrated in this example: .. includecode:: code/akka/docs/event/LoggingDocTestBase.java :include: imports,my-actor -The second parameter to the ``Logging.getLogger`` is the source of this logging channel. -The source object is translated to a String according to the following rules: +The first parameter to ``Logging.getLogger`` could also be any +:class:`LoggingBus`, specifically ``system.eventStream()``; in the demonstrated +case, the actor system’s address is included in the ``akkaSource`` +representation of the log source (see `Logging Thread and Akka Source in MDC`_) +while in the second case this is not automatically done. The second parameter +to ``Logging.getLogger`` is the source of this logging channel. The source +object is translated to a String according to the following rules: * if it is an Actor or ActorRef, its path is used * in case of a String it is used as is @@ -28,6 +33,13 @@ The source object is translated to a String according to the following rules: The log message may contain argument placeholders ``{}``, which will be substituted if the log level is enabled. +The Java :class:`Class` of the log source is also included in the generated +:class:`LogEvent`. In case of a simple string this is replaced with a “marker” +class :class:`akka.event.DummyClassForStringSources` in order to allow special +treatment of this case, e.g. in the SLF4J event listener which will then use +the string instead of the class’ name for looking up the logger instance to +use. + Event Handler ============= @@ -83,8 +95,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger. loglevel = "DEBUG" } -Logging thread in MDC ---------------------- +Logging Thread and Akka Source in MDC +------------------------------------- Since the logging is done asynchronously the thread in which the logging was performed is captured in Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``. @@ -96,3 +108,22 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi +.. note:: + + It will probably be a good idea to use the ``sourceThread`` MDC value also in + non-Akka parts of the application in order to have this property consistently + available in the logs. + +Another helpful facility is that Akka captures the actor’s address when +instantiating a logger within it, meaning that the full instance identification +is available for associating log messages e.g. with members of a router. This +information is available in the MDC with attribute name ``akkaSource``:: + + + + %date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n + + + +For more details on what this attribute contains—also for non-actors—please see +`How to Log`_. diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index e80514a8fe..42ad1108ea 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -44,9 +45,8 @@ You can also give the router already created routees as in: When you create a router programatically you define the number of routees *or* you pass already created routees to it. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. -*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in -the configuration file then this value will be used instead of any programmatically sent parameters, but you must -also define the ``router`` property in the configuration.* +*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used +instead of any programmatically sent parameters.* Once you have the router actor it is just to send messages to it as you would to any actor: @@ -122,6 +122,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the non-suspended routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/jrouting/ParentActor.java#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. @@ -241,6 +256,14 @@ If you are interested in how to use the VoteCountRouter it looks like this: .. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest +Configured Custom Router +************************ + +It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment +configuration you define the fully qualified class name of the router class. The router class must extend +``akka.routing.CustomRouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter. +The deployment section of the configuration is passed to the constructor. + Custom Resizer ************** diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index 44fd51884c..0c8d239d03 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -4,6 +4,10 @@ Migration Guide 1.3.x to 2.0.x ################################ +.. sidebar:: Contents + + .. contents:: :local: + Actors ====== @@ -13,9 +17,177 @@ significant amount of time. Detailed migration guide will be written. +Migration Kit +============= + +Nobody likes a big refactoring that takes several days to complete until +anything is able to run again. Therefore we provide a migration kit that +makes it possible to do the migration changes in smaller steps. + +The migration kit only covers the most common usage of Akka. It is not intended +as a final solution. The whole migration kit is deprecated and will be removed in +Akka 2.1. + +The migration kit is provided in separate jar files. Add the following dependency:: + + "com.typesafe.akka" % "akka-actor-migration" % "2.0-SNAPSHOT" + +The first step of the migration is to do some trivial replacements. +Search and replace the following (be careful with the non qualified names): + +==================================== ==================================== +Search Replace with +==================================== ==================================== +``akka.actor.Actor`` ``akka.actor.OldActor`` +``extends Actor`` ``extends OldActor`` +``akka.actor.Scheduler`` ``akka.actor.OldScheduler`` +``Scheduler`` ``OldScheduler`` +``akka.event.EventHandler`` ``akka.event.OldEventHandler`` +``EventHandler`` ``OldEventHandler`` +``akka.config.Config`` ``akka.config.OldConfig`` +``Config`` ``OldConfig`` +==================================== ==================================== + +For Scala users the migration kit also contains some implicit conversions to be +able to use some old methods. These conversions are useful from tests or other +code used outside actors. + +:: + + import akka.migration._ + +Thereafter you need to fix compilation errors that are not handled by the migration +kit, such as: + +* Definition of supervisors +* Definition of dispatchers +* ActorRegistry + +When everything compiles you continue by replacing/removing the ``OldXxx`` classes +one-by-one from the migration kit with appropriate migration. + +When using the migration kit there will be one global actor system, which loads +the configuration ``akka.conf`` from the same locations as in Akka 1.x. +This means that while you are using the migration kit you should not create your +own ``ActorSystem``, but instead use the ``akka.actor.GlobalActorSystem``. +In order to voluntarily exit the JVM you must ``shutdown`` the ``GlobalActorSystem`` +Last task of the migration would be to create your own ``ActorSystem``. + + Unordered Collection of Migration Items ======================================= +Creating and starting actors +---------------------------- + +Actors are created by passing in a ``Props`` instance into the actorOf factory method in +a ``ActorRefProvider``, which is the ``ActorSystem`` or ``ActorContext``. +Use the system to create top level actors. Use the context to +create actors from other actors. The difference is how the supervisor hierarchy is arranged. +When using the context the current actor will be supervisor of the created child actor. +When using the system it will be a top level actor, that is supervised by the system +(internal guardian actor). + +``ActorRef.start()`` has been removed. Actors are now started automatically when created. +Remove all invocations of ``ActorRef.start()``. + +v1.3:: + + val myActor = Actor.actorOf[MyActor] + myActor.start() + +v2.0:: + + // top level actor + val firstActor = system.actorOf(Props[FirstActor], name = "first") + + // child actor + class FirstActor extends Actor { + val myActor = context.actorOf(Props[MyActor], name = "myactor") + +Documentation: + + * :ref:`actors-scala` + * :ref:`untyped-actors-java` + +Stopping actors +--------------- + +``ActorRef.stop()`` has been moved. Use ``ActorSystem`` or ``ActorContext`` to stop actors. + +v1.3:: + + actorRef.stop() + self.stop() + actorRef ! PoisonPill + +v2.0:: + + context.stop(someChild) + context.stop(self) + system.stop(actorRef) + actorRef ! PoisonPill + +*Stop all actors* + +v1.3:: + + ActorRegistry.shutdownAll() + +v2.0:: + + system.shutdown() + +Documentation: + + * :ref:`actors-scala` + * :ref:`untyped-actors-java` + +Identifying Actors +------------------ + +In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``. + +The ``ActorRegistry`` has been replaced by actor paths and lookup with +``actorFor`` in ``ActorRefProvider`` (``ActorSystem`` or ``ActorContext``). + +v1.3:: + + val actor = Actor.registry.actorFor(uuid) + val actors = Actor.registry.actorsFor(id) + +v2.0:: + + val actor = context.actorFor("/user/serviceA/aggregator") + +Documentation: + + * :ref:`addressing` + * :ref:`actors-scala` + * :ref:`untyped-actors-java` + +Reply to messages +----------------- + +``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala) +or ``getSender()`` (Java). This works for both tell (!) and ask (?). + +v1.3:: + + self.channel ! result + self.channel tryTell result + self.reply(result) + self.tryReply(result) + +v2.0:: + + sender ! result + +Documentation: + + * :ref:`actors-scala` + * :ref:`untyped-actors-java` + ``ActorRef.ask()`` ------------------ @@ -28,7 +200,185 @@ reply to be received; it is independent of the timeout applied when awaiting completion of the :class:`Future`, however, the actor will complete the :class:`Future` with an :class:`AskTimeoutException` when it stops itself. +Documentation: + + * :ref:`actors-scala` + * :ref:`untyped-actors-java` + ActorPool --------- The ActorPool has been replaced by dynamically resizable routers. + +Documentation: + + * :ref:`routing-scala` + * :ref:`routing-java` + +``UntypedActor.getContext()`` (Java API only) +--------------------------------------------- + +``getContext()`` in the Java API for UntypedActor is renamed to +``getSelf()``. + +v1.3:: + + actorRef.tell("Hello", getContext()); + +v2.0:: + + actorRef.tell("Hello", getSelf()); + +Documentation: + + * :ref:`untyped-actors-java` + +Logging +------- + +EventHandler API has been replaced by LoggingAdapter, which publish log messages +to the event bus. You can still plugin your own actor as event listener with the +``akka.event-handlers`` configuration property. + +v1.3:: + + EventHandler.error(exception, this, message) + EventHandler.warning(this, message) + EventHandler.info(this, message) + EventHandler.debug(this, message) + EventHandler.debug(this, "Processing took %s ms".format(duration)) + +v2.0:: + + import akka.event.Logging + + val log = Logging(context.system, this) + log.error(exception, this, message) + log.warning(this, message) + log.info(this, message) + log.debug(this, message) + log.debug(this, "Processing took {} ms", duration) + +Documentation: + + * :ref:`logging-scala` + * :ref:`logging-java` + * :ref:`event-bus-scala` + * :ref:`event-bus-java` + +Supervision +----------- + +Akka v2.0 implements parental supervision. Actors can only be created by other actors — where the top-level +actor is provided by the library — and each created actor is supervised by its parent. +In contrast to the special supervision relationship between parent and child, each actor may monitor any +other actor for termination. + +v1.3:: + + self.link(actorRef) + self.unlink(actorRef) + +v2.0:: + + class WatchActor extends Actor { + val actorRef = ... + // Terminated message will be delivered when the actorRef actor + // is stopped + context.watch(actorRef) + + val supervisedChild = context.actorOf(Props[ChildActor]) + + def receive = { + case Terminated(`actorRef`) ⇒ ... + } + } + +Note that ``link`` in v1.3 established a supervision relation, which ``watch`` doesn't. +``watch`` is only a way to get notification, ``Terminated`` message, when the monitored +actor has been stopped. + +*Refererence to the supervisor* + +v1.3:: + + self.supervisor + +v2.0:: + + context.parent + +*Fault handling strategy* + +v1.3:: + + val supervisor = Supervisor( + SupervisorConfig( + AllForOneStrategy(List(classOf[Exception]), 3, 1000), + Supervise( + actorOf[MyActor1], + Permanent) :: + Supervise( + actorOf[MyActor2], + Permanent) :: + Nil)) + +v2.0:: + + val strategy = OneForOneStrategy({ + case _: ArithmeticException ⇒ Resume + case _: NullPointerException ⇒ Restart + case _: IllegalArgumentException ⇒ Stop + case _: Exception ⇒ Escalate + }: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000)) + + val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(strategy), "supervisor") + +Documentation: + + * :ref:`supervision` + * :ref:`fault-tolerance-java` + * :ref:`fault-tolerance-scala` + * :ref:`actors-scala` + * :ref:`untyped-actors-java` + +Spawn +----- + +``spawn`` has been removed and can be implemented like this, if needed. Be careful to not +access any shared mutable state closed over by the body. + +:: + + def spawn(body: ⇒ Unit) { + system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) })) ! "go" + } + +Documentation: + + * :ref:`jmm` + +HotSwap +------- + +In v2.0 ``become`` and ``unbecome`` metods are located in ``ActorContext``, i.e. ``context.become`` and ``context.unbecome``. + +The special ``HotSwap`` and ``RevertHotswap`` messages in v1.3 has been removed. Similar can be +implemented with your own message and using ``context.become`` and ``context.unbecome`` +in the actor receiving the message. + + * :ref:`actors-scala` + * :ref:`untyped-actors-java` + +More to be written +------------------ + +* Futures +* Dispatchers +* STM +* TypedActors +* Routing +* Remoting +* Scheduler +* Configuration +* ...? \ No newline at end of file diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 21f3492370..8f0b344558 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -7,6 +7,9 @@ package akka.docs.actor import akka.actor.Actor import akka.actor.Props import akka.event.Logging + +//#imports1 + import akka.dispatch.Future import akka.actor.ActorSystem import org.scalatest.{ BeforeAndAfterAll, WordSpec } @@ -162,10 +165,10 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { system.eventStream.subscribe(testActor, classOf[Logging.Info]) myActor ! "test" - expectMsgPF(1 second) { case Logging.Info(_, "received test") ⇒ true } + expectMsgPF(1 second) { case Logging.Info(_, _, "received test") ⇒ true } myActor ! "unknown" - expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") ⇒ true } + expectMsgPF(1 second) { case Logging.Info(_, _, "received unknown message") ⇒ true } system.eventStream.unsubscribe(testActor) system.eventStream.publish(TestEvent.UnMute(filter)) diff --git a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala index ffa56a3064..652c36af3f 100644 --- a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala @@ -38,15 +38,33 @@ object LoggingDocSpec { class MyEventListener extends Actor { def receive = { - case InitializeLogger(_) ⇒ sender ! LoggerInitialized - case Error(cause, logSource, message) ⇒ // ... - case Warning(logSource, message) ⇒ // ... - case Info(logSource, message) ⇒ // ... - case Debug(logSource, message) ⇒ // ... + case InitializeLogger(_) ⇒ sender ! LoggerInitialized + case Error(cause, logSource, logClass, message) ⇒ // ... + case Warning(logSource, logClass, message) ⇒ // ... + case Info(logSource, logClass, message) ⇒ // ... + case Debug(logSource, logClass, message) ⇒ // ... } } //#my-event-listener + //#my-source + import akka.event.LogSource + import akka.actor.ActorSystem + + object MyType { + implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { + def genString(o: AnyRef): String = o.getClass.getName + override def getClazz(o: AnyRef): Class[_] = o.getClass + } + } + + class MyType(system: ActorSystem) { + import MyType._ + import akka.event.Logging + + val log = Logging(system, this) + } + //#my-source } class LoggingDocSpec extends AkkaSpec { diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index d688da6544..3a9f566ed8 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -8,6 +8,7 @@ import annotation.tailrec import akka.actor.{ Props, Actor } import akka.util.duration._ import akka.dispatch.Await +import akka.routing.SmallestMailboxRouter case class FibonacciNumber(nbr: Int) @@ -59,6 +60,14 @@ class ParentActor extends Actor { i ⇒ randomRouter ! i } //#randomRouter + case "smr" ⇒ + //#smallestMailboxRouter + val smallestMailboxRouter = + context.actorOf(Props[PrintlnActor].withRouter(SmallestMailboxRouter(5)), "router") + 1 to 10 foreach { + i ⇒ smallestMailboxRouter ! i + } + //#smallestMailboxRouter case "br" ⇒ //#broadcastRouter val broadcastRouter = diff --git a/akka-docs/scala/logging.rst b/akka-docs/scala/logging.rst index 35f4e838ff..debafcedc5 100644 --- a/akka-docs/scala/logging.rst +++ b/akka-docs/scala/logging.rst @@ -22,6 +22,8 @@ For convenience you can mixin the ``log`` member into actors, instead of definin .. code-block:: scala class MyActor extends Actor with akka.actor.ActorLogging { + ... + } The second parameter to the ``Logging`` is the source of this logging channel. The source object is translated to a String according to the following rules: @@ -29,17 +31,46 @@ The source object is translated to a String according to the following rules: * if it is an Actor or ActorRef, its path is used * in case of a String it is used as is * in case of a class an approximation of its simpleName - * and in all other cases the simpleName of its class + * and in all other cases a compile error occurs unless and implicit + :class:`LogSource[T]` is in scope for the type in question. The log message may contain argument placeholders ``{}``, which will be substituted if the log level is enabled. +Translating Log Source to String and Class +------------------------------------------ + +The rules for translating the source object to the source string and class +which are inserted into the :class:`LogEvent` during runtime are implemented +using implicit parameters and thus fully customizable: simply create your own +instance of :class:`LogSource[T]` and have it in scope when creating the +logger. + +.. includecode:: code/akka/docs/event/LoggingDocSpec.scala#my-source + +This example creates a log source which mimics traditional usage of Java +loggers, which are based upon the originating object’s class name as log +category. The override of :meth:`getClazz` is only included for demonstration +purposes as it contains exactly the default behavior. + +.. note:: + + You may also create the string representation up front and pass that in as + the log source, but be aware that then the :class:`Class[_]` which will be + put in the :class:`LogEvent` is + :class:`akka.event.DummyClassForStringSources`. + + The SLF4J event listener treats this case specially (using the actual string + to look up the logger instance to use instead of the class’ name), and you + might want to do this also in case you implement your own loggin adapter. + Event Handler ============= -Logging is performed asynchronously through an event bus. You can configure which event handlers that should -subscribe to the logging events. That is done using the 'event-handlers' element in the :ref:`configuration`. -Here you can also define the log level. +Logging is performed asynchronously through an event bus. You can configure +which event handlers that should subscribe to the logging events. That is done +using the ``event-handlers`` element in the :ref:`configuration`. Here you can +also define the log level. .. code-block:: ruby @@ -50,7 +81,8 @@ Here you can also define the log level. loglevel = "DEBUG" } -The default one logs to STDOUT and is registered by default. It is not intended to be used for production. There is also an :ref:`slf4j-scala` +The default one logs to STDOUT and is registered by default. It is not intended +to be used for production. There is also an :ref:`slf4j-scala` event handler available in the 'akka-slf4j' module. Example of creating a listener: @@ -58,7 +90,6 @@ Example of creating a listener: .. includecode:: code/akka/docs/event/LoggingDocSpec.scala :include: my-event-listener - .. _slf4j-scala: SLF4J @@ -85,8 +116,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger. loglevel = "DEBUG" } -Logging thread in MDC ---------------------- +Logging Thread and Akka Source in MDC +------------------------------------- Since the logging is done asynchronously the thread in which the logging was performed is captured in Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``. @@ -98,3 +129,22 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi +.. note:: + + It will probably be a good idea to use the ``sourceThread`` MDC value also in + non-Akka parts of the application in order to have this property consistently + available in the logs. + +Another helpful facility is that Akka captures the actor’s address when +instantiating a logger within it, meaning that the full instance identification +is available for associating log messages e.g. with members of a router. This +information is available in the MDC with attribute name ``akkaSource``:: + + + + %date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n + + + +For more details on what this attribute contains—also for non-actors—please see +`How to Log`_. diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index ad06b67b8b..5b2ed24d28 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -44,9 +45,8 @@ You can also give the router already created routees as in: When you create a router programatically you define the number of routees *or* you pass already created routees to it. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. -*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in -the configuration file then this value will be used instead of any programmatically sent parameters, but you must -also define the ``router`` property in the configuration.* +*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used +instead of any programmatically sent parameters.* Once you have the router actor it is just to send messages to it as you would to any actor: @@ -123,6 +123,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the non-suspended routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/routing/RouterTypeExample.scala#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. @@ -240,6 +255,14 @@ All in all the custom router looks like this: If you are interested in how to use the VoteCountRouter you can have a look at the test class `RoutingSpec `_ +Configured Custom Router +************************ + +It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment +configuration you define the fully qualified class name of the router class. The router class must extend +``akka.routing.RouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter. +The deployment section of the configuration is passed to the constructor. + Custom Resizer ************** diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 1af4802552..6efa542e0e 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -153,7 +153,7 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { case message: DaemonMsg ⇒ - log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName) + log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName) message match { case DaemonMsgCreate(factory, path, supervisor) ⇒ import remote.remoteAddress diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5b748098ac..037f9d594a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -27,8 +27,6 @@ class RemoteActorRefProvider( val scheduler: Scheduler, _deadLetters: InternalActorRef) extends ActorRefProvider { - val log = Logging(eventStream, "RemoteActorRefProvider") - val remoteSettings = new RemoteSettings(settings.config, systemName) def rootGuardian = local.rootGuardian @@ -44,6 +42,8 @@ class RemoteActorRefProvider( val remote = new Remote(settings, remoteSettings) implicit val transports = remote.transports + val log = Logging(eventStream, "RemoteActorRefProvider(" + remote.remoteAddress + ")") + val rootPath: ActorPath = RootActorPath(remote.remoteAddress) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index d8f466c9d2..fe6844b8dc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -28,6 +28,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings val r = deploy.routing match { case RoundRobinRouter(x, _, resizer) ⇒ RemoteRoundRobinRouter(x, nodes, resizer) case RandomRouter(x, _, resizer) ⇒ RemoteRandomRouter(x, nodes, resizer) + case SmallestMailboxRouter(x, _, resizer) ⇒ RemoteSmallestMailboxRouter(x, nodes, resizer) case BroadcastRouter(x, _, resizer) ⇒ RemoteBroadcastRouter(x, nodes, resizer) case ScatterGatherFirstCompletedRouter(x, _, w, resizer) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer) } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 4624c9dc73..719261a5b6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -60,7 +60,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - log.debug("Sending message: {}", message) + log.debug("Sending message {} from {} to {}", message, senderOption, recipient) send((message, senderOption, recipient)) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala index 52b2d05618..83a64d09a7 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala @@ -82,6 +82,33 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], ove def this(resizer: Resizer) = this(0, Nil, Some(resizer)) } +/** + * A Router that tries to send to routee with fewest messages in mailbox. + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the random router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) + extends RemoteRouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets the routees to be used. + * Java API + */ + def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(0, Nil, Some(resizer)) +} + /** * A Router that uses broadcasts a message to all its connections. *
diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index 91f3123634..91a6cd7bf2 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -8,6 +8,7 @@ import org.slf4j.{ Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory } import org.slf4j.MDC import akka.event.Logging._ import akka.actor._ +import akka.event.DummyClassForStringSources /** * Base trait for all classes that wants to be able use the SLF4J logging infrastructure. @@ -19,6 +20,10 @@ trait SLF4JLogging { object Logger { def apply(logger: String): SLFLogger = SLFLoggerFactory getLogger logger + def apply(logClass: Class[_], logSource: String): SLFLogger = logClass match { + case c if c == classOf[DummyClassForStringSources] ⇒ apply(logSource) + case _ ⇒ SLFLoggerFactory getLogger logClass + } def root: SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME) } @@ -31,30 +36,31 @@ object Logger { class Slf4jEventHandler extends Actor with SLF4JLogging { val mdcThreadAttributeName = "sourceThread" + val mdcAkkaSourceAttributeName = "akkaSource" def receive = { - case event @ Error(cause, logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { + case event @ Error(cause, logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { cause match { - case Error.NoCause ⇒ Logger(logSource).error(message.toString) - case _ ⇒ Logger(logSource).error(message.toString, cause) + case Error.NoCause ⇒ Logger(logClass, logSource).error(message.toString) + case _ ⇒ Logger(logClass, logSource).error(message.toString, cause) } } - case event @ Warning(logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { - Logger(logSource).warn("{}", message.asInstanceOf[AnyRef]) + case event @ Warning(logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { + Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } - case event @ Info(logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { - Logger(logSource).info("{}", message.asInstanceOf[AnyRef]) + case event @ Info(logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { + Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } - case event @ Debug(logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { - Logger(logSource).debug("{}", message.asInstanceOf[AnyRef]) + case event @ Debug(logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { + Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } case InitializeLogger(_) ⇒ @@ -63,12 +69,14 @@ class Slf4jEventHandler extends Actor with SLF4JLogging { } @inline - final def withMdc(name: String, value: String)(logStatement: ⇒ Unit) { - MDC.put(name, value) + final def withMdc(logSource: String, thread: String)(logStatement: ⇒ Unit) { + MDC.put(mdcAkkaSourceAttributeName, logSource) + MDC.put(mdcThreadAttributeName, thread) try { logStatement } finally { - MDC.remove(name) + MDC.remove(mdcAkkaSourceAttributeName) + MDC.remove(mdcThreadAttributeName) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 7da8d84eba..3bee246e11 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -254,7 +254,7 @@ case class ErrorFilter( def matches(event: LogEvent) = { event match { - case Error(cause, src, msg) if throwable isInstance cause ⇒ + case Error(cause, src, _, msg) if throwable isInstance cause ⇒ (msg == null && cause.getMessage == null && cause.getStackTrace.length == 0) || doMatch(src, msg) || doMatch(src, cause.getMessage) case _ ⇒ false @@ -305,8 +305,8 @@ case class WarningFilter( def matches(event: LogEvent) = { event match { - case Warning(src, msg) ⇒ doMatch(src, msg) - case _ ⇒ false + case Warning(src, _, msg) ⇒ doMatch(src, msg) + case _ ⇒ false } } @@ -348,8 +348,8 @@ case class InfoFilter( def matches(event: LogEvent) = { event match { - case Info(src, msg) ⇒ doMatch(src, msg) - case _ ⇒ false + case Info(src, _, msg) ⇒ doMatch(src, msg) + case _ ⇒ false } } @@ -391,8 +391,8 @@ case class DebugFilter( def matches(event: LogEvent) = { event match { - case Debug(src, msg) ⇒ doMatch(src, msg) - case _ ⇒ false + case Debug(src, _, msg) ⇒ doMatch(src, msg) + case _ ⇒ false } } @@ -456,15 +456,15 @@ class TestEventListener extends Logging.DefaultLogger { case event: LogEvent ⇒ if (!filter(event)) print(event) case DeadLetter(msg: SystemMessage, _, rcp) ⇒ if (!msg.isInstanceOf[Terminate]) { - val event = Warning(rcp.path.toString, "received dead system message: " + msg) + val event = Warning(rcp.path.toString, rcp.getClass, "received dead system message: " + msg) if (!filter(event)) print(event) } case DeadLetter(msg, snd, rcp) ⇒ if (!msg.isInstanceOf[Terminated]) { - val event = Warning(rcp.path.toString, "received dead letter from " + snd + ": " + msg) + val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg) if (!filter(event)) print(event) } - case m ⇒ print(Debug(context.system.name, m)) + case m ⇒ print(Debug(context.system.name, this.getClass, m)) } def filter(event: LogEvent): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false }) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 747a9c90e9..d5c9b1a151 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -81,7 +81,7 @@ object TestActorRefSpec { var count = 0 var msg: String = _ def receive = { - case Warning(_, m: String) ⇒ count += 1; msg = m + case Warning(_, _, m: String) ⇒ count += 1; msg = m } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index ada3dbcbd2..e6a64ca0aa 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -31,7 +31,7 @@ object AkkaBuild extends Build { Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id) ), - aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, transactor, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs) + aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, transactor, mailboxes, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs) ) lazy val actor = Project( @@ -213,6 +213,13 @@ object AkkaBuild extends Build { ) ) + lazy val actorMigration = Project( + id = "akka-actor-migration", + base = file("akka-actor-migration"), + dependencies = Seq(actor, testkit % "test->test"), + settings = defaultSettings + ) + lazy val akkaSbtPlugin = Project( id = "akka-sbt-plugin", base = file("akka-sbt-plugin"), @@ -315,6 +322,8 @@ object AkkaBuild extends Build { if (true || (System getProperty "java.runtime.version" startsWith "1.7")) Seq() else Seq("-optimize")), // -optimize fails with jdk7 javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"), + ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet, + parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean, // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)