Merge branch 'master' into wip-1626-1627-typed-actors-√

This commit is contained in:
Viktor Klang 2012-01-16 15:48:39 +01:00
commit f4b9733c03
52 changed files with 1921 additions and 240 deletions

View file

@ -0,0 +1,64 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import java.io.File
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigResolveOptions
@deprecated("use ActorSystem instead", "2.0")
object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) {
start()
}
/**
* Loads configuration (akka.conf) from same location as Akka 1.x
*/
@deprecated("use default config location or write your own configuration loader", "2.0")
object OldConfigurationLoader {
val defaultConfig: Config = {
val cfg = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig
val config = cfg.withFallback(ConfigFactory.defaultReference)
config.checkValid(ConfigFactory.defaultReference, "akka")
config
}
// file extensions (.conf, .json, .properties), are handled by parseFileAnySyntax
val defaultLocation: String = (systemMode orElse envMode).map("akka." + _).getOrElse("akka")
private def envMode = System.getenv("AKKA_MODE") match {
case null | "" None
case value Some(value)
}
private def systemMode = System.getProperty("akka.mode") match {
case null | "" None
case value Some(value)
}
private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false)
private def fromProperties = try {
val property = Option(System.getProperty("akka.config"))
property.map(p
ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseFileAnySyntax(new File(p), configParseOptions)))
} catch { case _ None }
private def fromClasspath = try {
Option(ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseResourcesAnySyntax(ActorSystem.getClass, "/" + defaultLocation, configParseOptions)))
} catch { case _ None }
private def fromHome = try {
Option(ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseFileAnySyntax(new File(ActorSystem.GlobalHome.get + "/config/" + defaultLocation), configParseOptions)))
} catch { case _ None }
private def emptyConfig = ConfigFactory.systemProperties
}

View file

@ -0,0 +1,171 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.japi.Creator
import akka.util.Timeout
import akka.dispatch.Future
import akka.dispatch.OldFuture
import akka.util.Duration
import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress
/**
* Migration replacement for `object akka.actor.Actor`.
*/
@deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0")
object OldActor {
/**
* Creates an ActorRef out of the Actor with type T.
* It will be automatically started, i.e. remove old call to `start()`.
*
*/
@deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0")
def actorOf[T <: Actor: Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Creates an ActorRef out of the Actor of the specified Class.
* It will be automatically started, i.e. remove old call to `start()`.
*/
@deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0")
def actorOf(clazz: Class[_ <: Actor]): ActorRef = GlobalActorSystem.actorOf(Props(clazz))
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
*
* It will be automatically started, i.e. remove old call to `start()`.
*/
@deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0")
def actorOf(factory: Actor): ActorRef = GlobalActorSystem.actorOf(Props(factory))
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* JAVA API
*/
@deprecated("use ActorRefFactory (ActorSystem or ActorContext) to create actors", "2.0")
def actorOf(creator: Creator[Actor]): ActorRef = GlobalActorSystem.actorOf(Props(creator))
@deprecated("OldActor.remote should not be used", "2.0")
lazy val remote: OldRemoteSupport = new OldRemoteSupport
}
@deprecated("use Actor", "2.0")
abstract class OldActor extends Actor {
implicit def askTimeout: Timeout = akka.migration.askTimeout
implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = akka.migration.future2OldFuture(future)
implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef)
@deprecated("Use context.become instead", "2.0")
def become(behavior: Receive, discardOld: Boolean = true) = context.become(behavior, discardOld)
@deprecated("Use context.unbecome instead", "2.0")
def unbecome() = context.unbecome()
class OldActorRef(actorRef: ActorRef) {
@deprecated("Actors are automatically started when creatd, i.e. remove old call to start()", "2.0")
def start(): ActorRef = actorRef
@deprecated("Stop with ActorSystem or ActorContext instead", "2.0")
def exit() = stop()
@deprecated("Stop with ActorSystem or ActorContext instead", "2.0")
def stop(): Unit = context.stop(actorRef)
@deprecated("Use context.getReceiveTimeout instead", "2.0")
def getReceiveTimeout(): Option[Long] = context.receiveTimeout.map(_.toMillis)
@deprecated("Use context.setReceiveTimeout instead", "2.0")
def setReceiveTimeout(timeout: Long) = context.setReceiveTimeout(Duration(timeout, TimeUnit.MILLISECONDS))
@deprecated("Use context.getReceiveTimeout instead", "2.0")
def receiveTimeout: Option[Long] = getReceiveTimeout()
@deprecated("Use context.setReceiveTimeout instead", "2.0")
def receiveTimeout_=(timeout: Option[Long]) = setReceiveTimeout(timeout.getOrElse(0L))
@deprecated("Use self.isTerminated instead", "2.0")
def isShutdown: Boolean = self.isTerminated
@deprecated("Use sender instead", "2.0")
def channel() = context.sender
@deprecated("Use sender instead", "2.0")
def sender() = Some(context.sender)
@deprecated("Use sender ! instead", "2.0")
def reply(message: Any) = context.sender.!(message, context.self)
@deprecated("Use sender ! instead", "2.0")
def tryReply(message: Any): Boolean = {
reply(message)
true
}
@deprecated("Use sender ! instead", "2.0")
def tryTell(message: Any)(implicit sender: ActorRef = context.self): Boolean = {
actorRef.!(message)(sender)
true
}
@deprecated("Use sender ! akka.actor.Status.Failure(e) instead", "2.0")
def sendException(ex: Throwable): Boolean = {
context.sender.!(akka.actor.Status.Failure(ex), context.self)
true
}
}
}
class OldRemoteSupport {
@deprecated("remote.start is not needed", "2.0")
def start() {}
@deprecated("remote.start is not needed, use configuration to specify RemoteActorRefProvider, host and port", "2.0")
def start(host: String, port: Int) {}
@deprecated("remote.start is not needed, use configuration to specify RemoteActorRefProvider, host and port", "2.0")
def start(host: String, port: Int, loader: ClassLoader) {}
@deprecated("remote.shutdown is not needed", "2.0")
def shutdown() {}
@deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0")
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
GlobalActorSystem.actorFor("akka://%s@%s:%s/user/%s".format(GlobalActorSystem.name, hostname, port, classNameOrServiceId))
@deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0")
def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(classNameOrServiceId, hostname, port)
@deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0")
def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef =
actorFor(serviceId, hostname, port)
@deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0")
def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(serviceId, hostname, port)
@deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0")
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, hostname, port)
@deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0")
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(classNameOrServiceId, hostname, port)
@deprecated("use actorFor in ActorRefProvider (ActorSystem or ActorContext) instead", "2.0")
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(serviceId, hostname, port)
}

View file

@ -0,0 +1,75 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import java.util.concurrent.TimeUnit
import akka.util.Duration
/**
* Migration replacement for `object akka.actor.Scheduler`.
*/
@deprecated("use ActorSystem.scheduler instead", "2.0")
object OldScheduler {
/**
* Schedules to send the specified message to the receiver after initialDelay and then repeated after delay
*/
@deprecated("use ActorSystem.scheduler instead", "2.0")
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
GlobalActorSystem.scheduler.schedule(
Duration(initialDelay, timeUnit),
Duration(delay, timeUnit),
receiver,
message)
/**
* Schedules to run specified function to the receiver after initialDelay and then repeated after delay
*/
@deprecated("use ActorSystem.scheduler instead", "2.0")
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
GlobalActorSystem.scheduler.schedule(
Duration(initialDelay, timeUnit),
Duration(delay, timeUnit),
new Runnable { def run = f() })
/**
* Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay.
*/
@deprecated("use ActorSystem.scheduler instead", "2.0")
def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
GlobalActorSystem.scheduler.schedule(
Duration(initialDelay, timeUnit),
Duration(delay, timeUnit),
runnable)
/**
* Schedules to send the specified message to the receiver after delay
*/
@deprecated("use ActorSystem.scheduler instead", "2.0")
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable =
GlobalActorSystem.scheduler.scheduleOnce(
Duration(delay, timeUnit),
receiver,
message)
/**
* Schedules a function to be run after delay.
*/
@deprecated("use ActorSystem.scheduler instead", "2.0")
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable =
GlobalActorSystem.scheduler.scheduleOnce(
Duration(delay, timeUnit),
new Runnable { def run = f() })
/**
* Schedules a runnable to be run after delay,
*/
@deprecated("use ActorSystem.scheduler instead", "2.0")
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable =
GlobalActorSystem.scheduler.scheduleOnce(
Duration(delay, timeUnit),
runnable)
}

View file

@ -0,0 +1,162 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.config
import akka.actor.GlobalActorSystem
import com.typesafe.config.Config
/**
* Migration replacement for `object akka.config.Config`.
*/
@deprecated("use ActorSystem.settings.config instead", "2.0")
object OldConfig {
val config = new OldConfiguration(GlobalActorSystem.settings.config)
}
/**
* Migration adapter for `akka.config.Configuration`
*/
@deprecated("use ActorSystem.settings.config (com.typesafe.config.Config) instead", "2.0")
class OldConfiguration(config: Config) {
import scala.collection.JavaConverters._
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def contains(key: String): Boolean = config.hasPath(key)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def keys: Iterable[String] = config.root.keySet.asScala
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getAny(key: String): Option[Any] = {
try {
Option(config.getAnyRef(key))
} catch {
case _ None
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getAny(key: String, defaultValue: Any): Any = getAny(key).getOrElse(defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getSeqAny(key: String): Seq[Any] = {
try {
config.getAnyRefList(key).asScala
} catch {
case _ Seq.empty[Any]
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getString(key: String): Option[String] =
try {
Option(config.getString(key))
} catch {
case _ None
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getString(key: String, defaultValue: String): String = getString(key).getOrElse(defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getList(key: String): Seq[String] = {
try {
config.getStringList(key).asScala
} catch {
case _ Seq.empty[String]
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getInt(key: String): Option[Int] = {
try {
Option(config.getInt(key))
} catch {
case _ None
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getInt(key: String, defaultValue: Int): Int = getInt(key).getOrElse(defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getLong(key: String): Option[Long] = {
try {
Option(config.getLong(key))
} catch {
case _ None
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getLong(key: String, defaultValue: Long): Long = getLong(key).getOrElse(defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getFloat(key: String): Option[Float] = {
try {
Option(config.getDouble(key).toFloat)
} catch {
case _ None
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getFloat(key: String, defaultValue: Float): Float = getFloat(key).getOrElse(defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getDouble(key: String): Option[Double] = {
try {
Option(config.getDouble(key))
} catch {
case _ None
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getDouble(key: String, defaultValue: Double): Double = getDouble(key).getOrElse(defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getBoolean(key: String): Option[Boolean] = {
try {
Option(config.getBoolean(key))
} catch {
case _ None
}
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getBoolean(key: String, defaultValue: Boolean): Boolean = getBoolean(key).getOrElse(defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getBool(key: String): Option[Boolean] = getBoolean(key)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getBool(key: String, defaultValue: Boolean): Boolean = getBoolean(key, defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def apply(key: String): String = getString(key) match {
case None throw new ConfigurationException("undefined config: " + key)
case Some(v) v
}
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def apply(key: String, defaultValue: String) = getString(key, defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def apply(key: String, defaultValue: Int) = getInt(key, defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def apply(key: String, defaultValue: Long) = getLong(key, defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def apply(key: String, defaultValue: Boolean) = getBool(key, defaultValue)
@deprecated("use new com.typesafe.config.Config API instead", "2.0")
def getSection(name: String): Option[OldConfiguration] = {
try {
Option(new OldConfiguration(config.getConfig(name)))
} catch {
case _ None
}
}
}

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import java.util.concurrent.TimeoutException
import akka.util.duration._
import akka.AkkaException
import akka.util.BoxedType
import akka.util.Duration
import akka.actor.GlobalActorSystem
/**
* Some old methods made available through implicit conversion in
* [[akka.migration]].
*/
@deprecated("use new Future api instead", "2.0")
class OldFuture[T](future: Future[T]) {
@deprecated("use akka.dispatch.Await.result instead", "2.0")
def get: T = try {
Await.result(future, GlobalActorSystem.settings.ActorTimeout.duration)
} catch {
case e: TimeoutException throw new FutureTimeoutException(e.getMessage, e)
}
@deprecated("use akka.dispatch.Await.ready instead", "2.0")
def await: Future[T] = await(GlobalActorSystem.settings.ActorTimeout.duration)
@deprecated("use akka.dispatch.Await.ready instead", "2.0")
def await(atMost: Duration) = try {
Await.ready(future, atMost)
future
} catch {
case e: TimeoutException throw new FutureTimeoutException(e.getMessage, e)
}
@deprecated("use new Future api instead", "2.0")
def as[A](implicit m: Manifest[A]): Option[A] = {
try await catch { case _: FutureTimeoutException }
future.value match {
case None None
case Some(Left(ex)) throw ex
case Some(Right(v)) Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
}
}
@deprecated("use new Future api instead", "2.0")
def asSilently[A](implicit m: Manifest[A]): Option[A] = {
try await catch { case _: FutureTimeoutException }
future.value match {
case None None
case Some(Left(ex)) throw ex
case Some(Right(v))
try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
catch { case _: ClassCastException None }
}
}
}
@deprecated("Await throws java.util.concurrent.TimeoutException", "2.0")
class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(message: String) = this(message, null)
}

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import akka.actor.GlobalActorSystem
/**
* Migration replacement for `akka.event.EventHandler`
*/
@deprecated("use Logging instead", "2.0")
object OldEventHandler {
@deprecated("use Logging instead", "2.0")
def error(cause: Throwable, instance: AnyRef, message: String) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isErrorEnabled) log.error(cause, message)
}
@deprecated("use Logging instead", "2.0")
def error(cause: Throwable, instance: AnyRef, message: Any) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isErrorEnabled) log.error(cause, message.toString)
}
@deprecated("use Logging instead", "2.0")
def error(instance: AnyRef, message: String) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isErrorEnabled) log.error(message.toString)
}
@deprecated("use Logging instead", "2.0")
def error(instance: AnyRef, message: Any) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isErrorEnabled) log.error(message.toString)
}
@deprecated("use Logging instead", "2.0")
def warning(instance: AnyRef, message: String) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isWarningEnabled) log.warning(message)
}
@deprecated("use Logging instead", "2.0")
def warning(instance: AnyRef, message: Any) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isWarningEnabled) log.warning(message.toString)
}
@deprecated("use Logging instead", "2.0")
def info(instance: AnyRef, message: String) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isInfoEnabled) log.info(message)
}
@deprecated("use Logging instead", "2.0")
def info(instance: AnyRef, message: Any) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isInfoEnabled) log.info(message.toString)
}
@deprecated("use Logging instead", "2.0")
def debug(instance: AnyRef, message: String) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isDebugEnabled) log.debug(message)
}
@deprecated("use Logging instead", "2.0")
def debug(instance: AnyRef, message: Any) {
val log = Logging.getLogger(GlobalActorSystem, instance)
if (log.isDebugEnabled) log.debug(message.toString)
}
@deprecated("use Logging instead", "2.0")
def isInfoEnabled = Logging.getLogger(GlobalActorSystem, this).isInfoEnabled
@deprecated("use Logging instead", "2.0")
def isDebugEnabled = Logging.getLogger(GlobalActorSystem, this).isDebugEnabled
}

View file

@ -0,0 +1,34 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import akka.dispatch.Future
import akka.dispatch.OldFuture
import akka.util.Timeout
import akka.actor.GlobalActorSystem
import akka.dispatch.MessageDispatcher
import akka.actor.ActorRef
package object migration {
implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = new OldFuture[T](future)
implicit def askTimeout: Timeout = GlobalActorSystem.settings.ActorTimeout
implicit def defaultDispatcher: MessageDispatcher = GlobalActorSystem.dispatcher
implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef)
class OldActorRef(actorRef: ActorRef) {
@deprecated("Actors are automatically started when creatd, i.e. remove old call to start()", "2.0")
def start(): ActorRef = actorRef
@deprecated("Stop with ActorSystem or ActorContext instead", "2.0")
def exit() = stop()
@deprecated("Stop with ActorSystem or ActorContext instead", "2.0")
def stop(): Unit = GlobalActorSystem.stop(actorRef)
}
}

View file

@ -33,19 +33,26 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
} }
"notify with one Terminated message when an Actor is stopped" in { "notify with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props(context { case _ })) val terminal = system.actorOf(Props.empty)
startWatching(terminal) startWatching(terminal) ! "hallo"
expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill
testActor ! "ping"
expectMsg("ping")
terminal ! PoisonPill terminal ! PoisonPill
expectTerminationOf(terminal) expectTerminationOf(terminal)
} }
"notify with one Terminated message when an Actor is already dead" in {
val terminal = system.actorOf(Props.empty)
terminal ! PoisonPill
startWatching(terminal)
expectTerminationOf(terminal)
}
"notify with all monitors with one Terminated message when an Actor is stopped" in { "notify with all monitors with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props(context { case _ })) val terminal = system.actorOf(Props.empty)
val monitor1, monitor2, monitor3 = startWatching(terminal) val monitor1, monitor2, monitor3 = startWatching(terminal)
terminal ! PoisonPill terminal ! PoisonPill
@ -60,7 +67,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
} }
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in { "notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props(context { case _ })) val terminal = system.actorOf(Props.empty)
val monitor1, monitor3 = startWatching(terminal) val monitor1, monitor3 = startWatching(terminal)
val monitor2 = system.actorOf(Props(new Actor { val monitor2 = system.actorOf(Props(new Actor {
context.watch(terminal) context.watch(terminal)

View file

@ -171,7 +171,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
system.eventStream.subscribe(testActor, classOf[Logging.Error]) system.eventStream.subscribe(testActor, classOf[Logging.Error])
fsm ! "go" fsm ! "go"
expectMsgPF(1 second, hint = "Next state 2 does not exist") { expectMsgPF(1 second, hint = "Next state 2 does not exist") {
case Logging.Error(_, `name`, "Next state 2 does not exist") true case Logging.Error(_, `name`, _, "Next state 2 does not exist") true
} }
system.eventStream.unsubscribe(testActor) system.eventStream.unsubscribe(testActor)
} }
@ -218,18 +218,19 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
} }
}) })
val name = fsm.path.toString val name = fsm.path.toString
val fsmClass = fsm.underlyingActor.getClass
system.eventStream.subscribe(testActor, classOf[Logging.Debug]) system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go" fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") { expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
} }
expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown")) expectMsg(1 second, Logging.Debug(name, fsmClass, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2")) expectMsg(1 second, Logging.Debug(name, fsmClass, "transition 1 -> 2"))
fsm ! "stop" fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") { expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
} }
expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal) expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), Normal)
expectNoMsg(1 second) expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor) system.eventStream.unsubscribe(testActor)
} }

View file

@ -151,7 +151,7 @@ object ActorModelSpec {
await(deadline)(stops == dispatcher.stops.get) await(deadline)(stops == dispatcher.stops.get)
} catch { } catch {
case e case e
system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get + system.eventStream.publish(Error(e, dispatcher.toString, dispatcher.getClass, "actual: stops=" + dispatcher.stops.get +
" required: stops=" + stops)) " required: stops=" + stops))
throw e throw e
} }
@ -208,9 +208,12 @@ object ActorModelSpec {
await(deadline)(stats.restarts.get() == restarts) await(deadline)(stats.restarts.get() == restarts)
} catch { } catch {
case e case e
system.eventStream.publish(Error(e, Option(dispatcher).toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + system.eventStream.publish(Error(e,
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + Option(dispatcher).toString,
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) (Option(dispatcher) getOrElse this).getClass,
"actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
throw e throw e
} }
} }
@ -311,7 +314,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
try { try {
f f
} catch { } catch {
case e system.eventStream.publish(Error(e, "spawn", "error in spawned thread")) case e system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread"))
} }
} }
} }

View file

@ -108,7 +108,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
import Logging._ import Logging._
val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error")) val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error"))
val msg = allmsg filter (_.level <= level) val msg = allmsg filter (_.level <= level)
allmsg foreach bus.publish allmsg foreach bus.publish
msg foreach (x expectMsg(x)) msg foreach (x expectMsg(x))

View file

@ -59,7 +59,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
} }
val log = LoggingReceive("funky")(r) val log = LoggingReceive("funky")(r)
log.isDefinedAt("hallo") log.isDefinedAt("hallo")
expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo")) expectMsg(1 second, Logging.Debug("funky", classOf[DummyClassForStringSources], "received unhandled message hallo"))
} }
} }
@ -83,7 +83,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val name = actor.path.toString val name = actor.path.toString
actor ! "buh" actor ! "buh"
within(1 second) { within(1 second) {
expectMsg(Logging.Debug(name, "received handled message buh")) expectMsg(Logging.Debug(name, actor.underlyingActor.getClass, "received handled message buh"))
expectMsg("x") expectMsg("x")
} }
@ -109,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
}) })
actor ! "buh" actor ! "buh"
within(1 second) { within(1 second) {
expectMsg(Logging.Debug(actor.path.toString, "received handled message buh")) expectMsg(Logging.Debug(actor.path.toString, actor.underlyingActor.getClass, "received handled message buh"))
expectMsg("x") expectMsg("x")
} }
} }
@ -130,7 +130,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val name = actor.path.toString val name = actor.path.toString
actor ! PoisonPill actor ! PoisonPill
expectMsgPF() { expectMsgPF() {
case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true case Logging.Debug(`name`, _, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true
} }
awaitCond(actor.isTerminated, 100 millis) awaitCond(actor.isTerminated, 100 millis)
} }
@ -142,7 +142,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val sys = impl.systemGuardian.path.toString val sys = impl.systemGuardian.path.toString
ignoreMute(this) ignoreMute(this)
ignoreMsg { ignoreMsg {
case Logging.Debug(s, _) s.contains("MainBusReaper") || s == sys case Logging.Debug(`sys`, _, _) true
} }
system.eventStream.subscribe(testActor, classOf[Logging.Debug]) system.eventStream.subscribe(testActor, classOf[Logging.Debug])
system.eventStream.subscribe(testActor, classOf[Logging.Error]) system.eventStream.subscribe(testActor, classOf[Logging.Error])
@ -151,51 +151,53 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val lname = lifecycleGuardian.path.toString val lname = lifecycleGuardian.path.toString
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
val sname = supervisor.path.toString val sname = supervisor.path.toString
val sclass = classOf[TestLogActor]
val supervisorSet = receiveWhile(messages = 2) { val supervisorSet = receiveWhile(messages = 2) {
case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" 1 case Logging.Debug(`lname`, _, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`sname`, msg: String) if msg startsWith "started" 2 case Logging.Debug(`sname`, `sclass`, msg: String) if msg startsWith "started" 2
}.toSet }.toSet
expectNoMsg(Duration.Zero) expectNoMsg(Duration.Zero)
assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)") assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)")
val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none") val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none")
val aname = actor.path.toString val aname = actor.path.toString
val aclass = classOf[TestLogActor]
val set = receiveWhile(messages = 2) { val set = receiveWhile(messages = 2) {
case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" 1 case Logging.Debug(`sname`, _, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`aname`, msg: String) if msg startsWith "started" 2 case Logging.Debug(`aname`, `aclass`, msg: String) if msg startsWith "started" 2
}.toSet }.toSet
expectNoMsg(Duration.Zero) expectNoMsg(Duration.Zero)
assert(set == Set(1, 2), set + " was not Set(1, 2)") assert(set == Set(1, 2), set + " was not Set(1, 2)")
supervisor watch actor supervisor watch actor
expectMsgPF(hint = "now monitoring") { expectMsgPF(hint = "now monitoring") {
case Logging.Debug(ref, msg: String) case Logging.Debug(ref, `sclass`, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("now monitoring") ref == supervisor.underlyingActor && msg.startsWith("now monitoring")
} }
supervisor unwatch actor supervisor unwatch actor
expectMsgPF(hint = "stopped monitoring") { expectMsgPF(hint = "stopped monitoring") {
case Logging.Debug(ref, msg: String) case Logging.Debug(ref, `sclass`, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring") ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring")
} }
EventFilter[ActorKilledException](occurrences = 1) intercept { EventFilter[ActorKilledException](occurrences = 1) intercept {
actor ! Kill actor ! Kill
val set = receiveWhile(messages = 3) { val set = receiveWhile(messages = 3) {
case Logging.Error(_: ActorKilledException, `aname`, "Kill") 1 case Logging.Error(_: ActorKilledException, `aname`, `aclass`, "Kill") 1
case Logging.Debug(`aname`, "restarting") 2 case Logging.Debug(`aname`, `aclass`, "restarting") 2
case Logging.Debug(`aname`, "restarted") 3 case Logging.Debug(`aname`, `aclass`, "restarted") 3
}.toSet }.toSet
expectNoMsg(Duration.Zero) expectNoMsg(Duration.Zero)
assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)")
} }
system.stop(supervisor) system.stop(supervisor)
expectMsg(Logging.Debug(sname, "stopping")) expectMsg(Logging.Debug(sname, `sclass`, "stopping"))
expectMsg(Logging.Debug(aname, "stopped")) expectMsg(Logging.Debug(aname, `aclass`, "stopped"))
expectMsg(Logging.Debug(sname, "stopped")) expectMsg(Logging.Debug(sname, `sclass`, "stopped"))
} }
} }
} }

View file

@ -135,15 +135,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer)))) val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
val latch1 = new TestLatch(1) val latch1 = new TestLatch(1)
router.!((latch1, busy)) router ! (latch1, busy)
Await.ready(latch1, 2 seconds) Await.ready(latch1, 2 seconds)
val latch2 = new TestLatch(1) val latch2 = new TestLatch(1)
router.!((latch2, busy)) router ! (latch2, busy)
Await.ready(latch2, 2 seconds) Await.ready(latch2, 2 seconds)
val latch3 = new TestLatch(1) val latch3 = new TestLatch(1)
router.!((latch3, busy)) router ! (latch3, busy)
Await.ready(latch3, 2 seconds) Await.ready(latch3, 2 seconds)
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
@ -178,7 +178,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2)
def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = {
count.set(0) (10 millis).dilated.sleep
for (m 0 until loops) { for (m 0 until loops) {
router.!((t, latch, count)) router.!((t, latch, count))
(10 millis).dilated.sleep (10 millis).dilated.sleep

View file

@ -12,6 +12,8 @@ import akka.dispatch.Await
import akka.util.Duration import akka.util.Duration
import akka.config.ConfigurationException import akka.config.ConfigurationException
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
object RoutingSpec { object RoutingSpec {
@ -21,6 +23,10 @@ object RoutingSpec {
router = round-robin router = round-robin
nr-of-instances = 3 nr-of-instances = 3
} }
/myrouter {
router = "akka.routing.RoutingSpec$MyRouter"
foo = bar
}
} }
""" """
@ -37,6 +43,18 @@ object RoutingSpec {
} }
} }
class MyRouter(config: Config) extends RouterConfig {
val foo = config.getString("foo")
def createRoute(routeeProps: Props, actorContext: ActorContext): Route = {
val routees = IndexedSeq(actorContext.actorOf(Props[Echo]))
registerRoutees(actorContext, routees)
{
case (sender, message) Nil
}
}
}
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -256,6 +274,61 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
} }
} }
"smallest mailbox router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1)))
routedActor.isTerminated must be(false)
}
"deliver messages to idle actor" in {
val usedActors = new ConcurrentHashMap[Int, String]()
val router = system.actorOf(Props(new Actor {
def receive = {
case (busy: TestLatch, receivedLatch: TestLatch)
usedActors.put(0, self.path.toString)
self ! "another in busy mailbox"
receivedLatch.countDown()
Await.ready(busy, TestLatch.DefaultTimeout)
case (msg: Int, receivedLatch: TestLatch)
usedActors.put(msg, self.path.toString)
receivedLatch.countDown()
case s: String
}
}).withRouter(SmallestMailboxRouter(3)))
val busy = TestLatch(1)
val received0 = TestLatch(1)
router ! (busy, received0)
Await.ready(received0, TestLatch.DefaultTimeout)
val received1 = TestLatch(1)
router ! (1, received1)
Await.ready(received1, TestLatch.DefaultTimeout)
val received2 = TestLatch(1)
router ! (2, received2)
Await.ready(received2, TestLatch.DefaultTimeout)
val received3 = TestLatch(1)
router ! (3, received3)
Await.ready(received3, TestLatch.DefaultTimeout)
busy.countDown()
val busyPath = usedActors.get(0)
busyPath must not be (null)
val path1 = usedActors.get(1)
val path2 = usedActors.get(2)
val path3 = usedActors.get(3)
path1 must not be (busyPath)
path2 must not be (busyPath)
path3 must not be (busyPath)
}
}
"broadcast router" must { "broadcast router" must {
"be started when constructed" in { "be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1))) val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1)))
@ -409,6 +482,10 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
sys.shutdown() sys.shutdown()
} }
} }
"support custom router" in {
val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter")
myrouter.isTerminated must be(false)
}
} }
"custom router" must { "custom router" must {

View file

@ -64,8 +64,10 @@ akka {
default { default {
# routing (load-balance) scheme to use # routing (load-balance) scheme to use
# available: "from-code", "round-robin", "random", "scatter-gather", "broadcast" # available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast"
# or: fully qualified class name of the router class # or: Fully qualified class name of the router class.
# The router class must extend akka.routing.CustomRouterConfig and and have constructor
# with com.typesafe.config.Config parameter.
# default is "from-code"; # default is "from-code";
# Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter). # Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter).
# The type of router can be overridden in the configuration; specifying "from-code" means # The type of router can be overridden in the configuration; specifying "from-code" means

View file

@ -112,7 +112,7 @@ object Status {
} }
trait ActorLogging { this: Actor trait ActorLogging { this: Actor
val log = akka.event.Logging(context.system.eventStream, context.self) val log = akka.event.Logging(context.system, context.self)
} }
object Actor { object Actor {

View file

@ -358,12 +358,12 @@ private[akka] class ActorCell(
actor = created actor = created
created.preStart() created.preStart()
checkReceiveTimeout checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "started (" + actor + ")")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch { } catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418 // FIXME catching all and continue isn't good for OOME, ticket #1418
case e case e
try { try {
system.eventStream.publish(Error(e, self.path.toString, "error while creating actor")) system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted // prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this) dispatcher.suspend(this)
} finally { } finally {
@ -373,7 +373,7 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = try { def recreate(cause: Throwable): Unit = try {
val failedActor = actor val failedActor = actor
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarting")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
val freshActor = newActor() val freshActor = newActor()
if (failedActor ne null) { if (failedActor ne null) {
val c = currentMessage //One read only plz val c = currentMessage //One read only plz
@ -388,7 +388,7 @@ private[akka] class ActorCell(
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
hotswap = Props.noHotSwap // Reset the behavior hotswap = Props.noHotSwap // Reset the behavior
freshActor.postRestart(cause) freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarted")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
dispatcher.resume(this) //FIXME should this be moved down? dispatcher.resume(this) //FIXME should this be moved down?
@ -396,7 +396,7 @@ private[akka] class ActorCell(
} catch { } catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418 // FIXME catching all and continue isn't good for OOME, ticket #1418
case e try { case e try {
system.eventStream.publish(Error(e, self.path.toString, "error while creating actor")) system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted // prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this) dispatcher.suspend(this)
} finally { } finally {
@ -417,7 +417,7 @@ private[akka] class ActorCell(
else { else {
// do not process normal messages while waiting for all children to terminate // do not process normal messages while waiting for all children to terminate
dispatcher suspend this dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
// do not use stop(child) because that would dissociate the children from us, but we still want to wait for them // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them
for (child c) child.asInstanceOf[InternalActorRef].stop() for (child c) child.asInstanceOf[InternalActorRef].stop()
stopping = true stopping = true
@ -428,12 +428,12 @@ private[akka] class ActorCell(
childrenRefs.get(child.path.name) match { childrenRefs.get(child.path.name) match {
case None case None
childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child)) childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child)) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(`child`, _, _)) case Some(ChildRestartStats(`child`, _, _))
// this is the nominal case where we created the child and entered it in actorCreated() above // this is the nominal case where we created the child and entered it in actorCreated() above
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child)) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(c, _, _)) case Some(ChildRestartStats(c, _, _))
system.eventStream.publish(Warning(self.path.toString, "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child)) system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child))
} }
} }
@ -448,10 +448,10 @@ private[akka] class ActorCell(
case Recreate(cause) recreate(cause) case Recreate(cause) recreate(cause)
case Link(subject) case Link(subject)
system.deathWatch.subscribe(self, subject) system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now monitoring " + subject)) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject))
case Unlink(subject) case Unlink(subject)
system.deathWatch.unsubscribe(self, subject) system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject)) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject))
case Suspend() suspend() case Suspend() suspend()
case Resume() resume() case Resume() resume()
case Terminate() terminate() case Terminate() terminate()
@ -460,7 +460,7 @@ private[akka] class ActorCell(
} }
} catch { } catch {
case e //Should we really catch everything here? case e //Should we really catch everything here?
system.eventStream.publish(Error(e, self.path.toString, "error while processing " + message)) system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message))
//TODO FIXME How should problems here be handled??? //TODO FIXME How should problems here be handled???
throw e throw e
} }
@ -480,7 +480,7 @@ private[akka] class ActorCell(
currentMessage = null // reset current message after successful invocation currentMessage = null // reset current message after successful invocation
} catch { } catch {
case e case e
system.eventStream.publish(Error(e, self.path.toString, e.getMessage)) system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
// prevent any further messages to be processed until the actor has been restarted // prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this) dispatcher.suspend(this)
@ -500,7 +500,7 @@ private[akka] class ActorCell(
} }
} catch { } catch {
case e case e
system.eventStream.publish(Error(e, self.path.toString, e.getMessage)) system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
throw e throw e
} }
} }
@ -530,7 +530,8 @@ private[akka] class ActorCell(
} }
def autoReceiveMessage(msg: Envelope) { def autoReceiveMessage(msg: Envelope) {
if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg)) if (system.settings.DebugAutoReceive)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match { msg.message match {
case Failed(cause) handleFailure(sender, cause) case Failed(cause) handleFailure(sender, cause)
@ -554,7 +555,8 @@ private[akka] class ActorCell(
try { try {
parent.sendSystemMessage(ChildTerminated(self)) parent.sendSystemMessage(ChildTerminated(self))
system.deathWatch.publish(Terminated(self)) system.deathWatch.publish(Terminated(self))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped")) if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) // FIXME: can actor be null?
} finally { } finally {
currentMessage = null currentMessage = null
clearActorFields() clearActorFields()
@ -565,8 +567,8 @@ private[akka] class ActorCell(
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match {
case Some(stats) if stats.child == child if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause case Some(stats) if stats.child == child if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause
case Some(stats) system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) case Some(stats) system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
case None system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child)) case None system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
} }
final def handleChildTerminated(child: ActorRef): Unit = { final def handleChildTerminated(child: ActorRef): Unit = {
@ -625,4 +627,9 @@ private[akka] class ActorCell(
lookupAndSetField(a.getClass, a, "self", self) lookupAndSetField(a.getClass, a, "self", self)
} }
} }
private def clazz(o: AnyRef): Class[_] = {
if (o eq null) this.getClass
else o.getClass
}
} }

View file

@ -449,7 +449,10 @@ object DeadLetterActorRef {
val serialized = new SerializedDeadLetterActorRef val serialized = new SerializedDeadLetterActorRef
} }
class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { trait DeadLetterActorRefLike extends MinimalActorRef {
def eventStream: EventStream
@volatile @volatile
private var brokenPromise: Future[Any] = _ private var brokenPromise: Future[Any] = _
@volatile @volatile
@ -477,7 +480,9 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
assert(brokenPromise != null) assert(brokenPromise != null)
brokenPromise brokenPromise
} }
}
class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike {
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
} }
@ -486,8 +491,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
* This special dead letter reference has a name: it is that which is returned * This special dead letter reference has a name: it is that which is returned
* by a local look-up which is unsuccessful. * by a local look-up which is unsuccessful.
*/ */
class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath)
extends DeadLetterActorRef(_eventStream) { extends DeadLetterActorRefLike {
init(_dispatcher, _path) init(_dispatcher, _path)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter // do NOT form endless loops case d: DeadLetter // do NOT form endless loops

View file

@ -296,7 +296,7 @@ class LocalActorRefProvider(
val nodename: String = "local" val nodename: String = "local"
val clustername: String = "local" val clustername: String = "local"
val log = Logging(eventStream, "LocalActorRefProvider") val log = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")")
/* /*
* generate name for temporary actor refs * generate name for temporary actor refs

View file

@ -330,7 +330,11 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
// this provides basic logging (to stdout) until .start() is called below // this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugEventStream) val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(settings) eventStream.startStdoutLogger(settings)
val log = new BusLogging(eventStream, "ActorSystem") // this used only for .getClass in tagging messages
// unfortunately we need logging before we know the rootpath address, which wants to be inserted here
@volatile
private var _log = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass)
def log = _log
val scheduler = createScheduler() val scheduler = createScheduler()
@ -383,6 +387,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
private lazy val _start: this.type = { private lazy val _start: this.type = {
// the provider is expected to start default loggers, LocalActorRefProvider does this // the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this) provider.init(this)
_log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass)
deadLetters.init(dispatcher, lookupRoot.path / "deadLetters") deadLetters.init(dispatcher, lookupRoot.path / "deadLetters")
// this starts the reaper actor and the user-configured logging subscribers, which are also actors // this starts the reaper actor and the user-configured logging subscribers, which are also actors
registerOnTermination(stopScheduler()) registerOnTermination(stopScheduler())
@ -498,4 +503,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
} }
} }
override def toString = lookupRoot.path.root.address.toString
} }

View file

@ -13,6 +13,7 @@ import akka.event.EventStream
import com.typesafe.config._ import com.typesafe.config._
import akka.routing._ import akka.routing._
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap } import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
import akka.util.ReflectiveAccess
case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope)
@ -56,27 +57,29 @@ class Deployer(val settings: ActorSystem.Settings) {
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
val resizer: Option[Resizer] = if (config.hasPath("resizer")) { val resizer: Option[Resizer] = if (config.hasPath("resizer")) {
val resizerConfig = deployment.getConfig("resizer") Some(DefaultResizer(deployment.getConfig("resizer")))
Some(DefaultResizer(
lowerBound = resizerConfig.getInt("lower-bound"),
upperBound = resizerConfig.getInt("upper-bound"),
pressureThreshold = resizerConfig.getInt("pressure-threshold"),
rampupRate = resizerConfig.getDouble("rampup-rate"),
backoffThreshold = resizerConfig.getDouble("backoff-threshold"),
backoffRate = resizerConfig.getDouble("backoff-rate"),
stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS),
messagesPerResize = resizerConfig.getInt("messages-per-resize")))
} else { } else {
None None
} }
val router: RouterConfig = deployment.getString("router") match { val router: RouterConfig = deployment.getString("router") match {
case "from-code" NoRouter case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer) case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, resizer) case "random" RandomRouter(nrOfInstances, routees, resizer)
case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) case "smallest-mailbox" SmallestMailboxRouter(nrOfInstances, routees, resizer)
case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer) case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case x throw new ConfigurationException("unknown router type " + x + " for path " + key) case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case fqn
val constructorSignature = Array[Class[_]](classOf[Config])
ReflectiveAccess.createInstance[RouterConfig](fqn, constructorSignature, Array[AnyRef](deployment)) match {
case Right(router) router
case Left(exception)
throw new IllegalArgumentException(
("Cannot instantiate router [%s], defined in [%s], " +
"make sure it extends [akka.routing.RouterConfig] and has constructor with " +
"[com.typesafe.config.Config] parameter")
.format(fqn, key), exception)
}
} }
val recipe: Option[ActorRecipe] = val recipe: Option[ActorRecipe] =

View file

@ -190,7 +190,7 @@ trait FSM[S, D] extends Listeners {
type Timeout = Option[Duration] type Timeout = Option[Duration]
type TransitionHandler = PartialFunction[(S, S), Unit] type TransitionHandler = PartialFunction[(S, S), Unit]
val log = Logging(context.system, context.self) val log = Logging(context.system, this)
/** /**
* **************************************** * ****************************************

View file

@ -80,7 +80,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl
runnable.run() runnable.run()
} catch { } catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418 // FIXME catching all and continue isn't good for OOME, ticket #1418
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) case e eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
} finally { } finally {
cleanup() cleanup()
} }

View file

@ -59,7 +59,7 @@ class Dispatcher(
executorService.get() execute invocation executorService.get() execute invocation
} catch { } catch {
case e2: RejectedExecutionException case e2: RejectedExecutionException
prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString)) prerequisites.eventStream.publish(Warning("Dispatcher", this.getClass, e2.toString))
throw e2 throw e2
} }
} }

View file

@ -77,7 +77,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
} else { } else {
// Note that the configurator of the default dispatcher will be registered for this id, // Note that the configurator of the default dispatcher will be registered for this id,
// so this will only be logged once, which is crucial. // so this will only be logged once, which is crucial.
prerequisites.eventStream.publish(Warning("Dispatchers", prerequisites.eventStream.publish(Warning("Dispatchers", this.getClass,
"Dispatcher [%s] not configured, using default-dispatcher".format(id))) "Dispatcher [%s] not configured, using default-dispatcher".format(id)))
lookupConfigurator(DefaultDispatcherId) lookupConfigurator(DefaultDispatcherId)
} }

View file

@ -325,7 +325,7 @@ object Future {
// FIXME catching all and continue isn't good for OOME, ticket #1418 // FIXME catching all and continue isn't good for OOME, ticket #1418
executor match { executor match {
case m: MessageDispatcher case m: MessageDispatcher
m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage)) m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", this.getClass, e.getMessage))
case other case other
e.printStackTrace() e.printStackTrace()
} }
@ -566,7 +566,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
protected def logError(msg: String, problem: Throwable): Unit = { protected def logError(msg: String, problem: Throwable): Unit = {
executor match { executor match {
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, msg, this.getClass, problem.getMessage))
case other problem.printStackTrace() case other problem.printStackTrace()
} }
} }

View file

@ -214,7 +214,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
} }
} catch { } catch {
case e case e
actor.system.eventStream.publish(Error(e, actor.self.path.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) actor.system.eventStream.publish(Error(e, actor.self.path.toString, actor.actor.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
throw e throw e
} }
} }

View file

@ -38,19 +38,19 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
} }
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel)) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
super.subscribe(subscriber, channel) super.subscribe(subscriber, channel)
} }
override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
val ret = super.unsubscribe(subscriber, channel) val ret = super.unsubscribe(subscriber, channel)
if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel)) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
ret ret
} }
override def unsubscribe(subscriber: ActorRef) { override def unsubscribe(subscriber: ActorRef) {
super.unsubscribe(subscriber) super.unsubscribe(subscriber)
if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels")) if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
} }
} }

View file

@ -16,10 +16,6 @@ import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.dispatch.Await import akka.dispatch.Await
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
}
/** /**
* This trait brings log level handling to the EventStream: it reads the log * This trait brings log level handling to the EventStream: it reads the log
* levels for the initial logging (StandardOutLogger) and the loggers & level * levels for the initial logging (StandardOutLogger) and the loggers & level
@ -75,7 +71,7 @@ trait LoggingBus extends ActorEventBus {
*/ */
private[akka] def startStdoutLogger(config: Settings) { private[akka] def startStdoutLogger(config: Settings) {
val level = levelFor(config.StdoutLogLevel) getOrElse { val level = levelFor(config.StdoutLogLevel) getOrElse {
StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), this.getClass, "unknown akka.stdout-loglevel " + config.StdoutLogLevel))
ErrorLevel ErrorLevel
} }
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l))) AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
@ -83,15 +79,16 @@ trait LoggingBus extends ActorEventBus {
loggers = Seq(StandardOutLogger) loggers = Seq(StandardOutLogger)
_logLevel = level _logLevel = level
} }
publish(Debug(simpleName(this), "StandardOutLogger started")) publish(Debug(simpleName(this), this.getClass, "StandardOutLogger started"))
} }
/** /**
* Internal Akka use only * Internal Akka use only
*/ */
private[akka] def startDefaultLoggers(system: ActorSystemImpl) { private[akka] def startDefaultLoggers(system: ActorSystemImpl) {
val logName = simpleName(this) + "(" + system + ")"
val level = levelFor(system.settings.LogLevel) getOrElse { val level = levelFor(system.settings.LogLevel) getOrElse {
StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel)) StandardOutLogger.print(Error(new EventHandlerException, logName, this.getClass, "unknown akka.stdout-loglevel " + system.settings.LogLevel))
ErrorLevel ErrorLevel
} }
try { try {
@ -105,7 +102,7 @@ trait LoggingBus extends ActorEventBus {
} yield { } yield {
try { try {
ReflectiveAccess.getClassFor[Actor](loggerName) match { ReflectiveAccess.getClassFor[Actor](loggerName) match {
case Right(actorClass) addLogger(system, actorClass, level) case Right(actorClass) addLogger(system, actorClass, level, logName)
case Left(exception) throw exception case Left(exception) throw exception
} }
} catch { } catch {
@ -119,7 +116,7 @@ trait LoggingBus extends ActorEventBus {
loggers = myloggers loggers = myloggers
_logLevel = level _logLevel = level
} }
publish(Debug(simpleName(this), "Default Loggers started")) publish(Debug(logName, this.getClass, "Default Loggers started"))
if (!(defaultLoggers contains StandardOutLoggerName)) { if (!(defaultLoggers contains StandardOutLoggerName)) {
unsubscribe(StandardOutLogger) unsubscribe(StandardOutLogger)
} }
@ -138,7 +135,7 @@ trait LoggingBus extends ActorEventBus {
val level = _logLevel // volatile access before reading loggers val level = _logLevel // volatile access before reading loggers
if (!(loggers contains StandardOutLogger)) { if (!(loggers contains StandardOutLogger)) {
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l))) AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
publish(Debug(simpleName(this), "shutting down: StandardOutLogger started")) publish(Debug(simpleName(this), this.getClass, "shutting down: StandardOutLogger started"))
} }
for { for {
logger loggers logger loggers
@ -151,33 +148,105 @@ trait LoggingBus extends ActorEventBus {
case _ case _
} }
} }
publish(Debug(simpleName(this), "all default loggers stopped")) publish(Debug(simpleName(this), this.getClass, "all default loggers stopped"))
} }
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = {
val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
val actor = system.systemActorOf(Props(clazz), name) val actor = system.systemActorOf(Props(clazz), name)
implicit val timeout = Timeout(3 seconds) implicit val timeout = Timeout(3 seconds)
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch {
case _: TimeoutException case _: TimeoutException
publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
} }
if (response != LoggerInitialized) if (response != LoggerInitialized)
throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response) throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response)
AllLogLevels filter (level >= _) foreach (l subscribe(actor, classFor(l))) AllLogLevels filter (level >= _) foreach (l subscribe(actor, classFor(l)))
publish(Debug(simpleName(this), "logger " + name + " started")) publish(Debug(logName, this.getClass, "logger " + name + " started"))
actor actor
} }
} }
/**
* This trait defines the interface to be provided by a log source formatting
* rule as used by [[akka.event.Logging]]s `apply`/`create` method.
*
* See the companion object for default implementations.
*
* Example:
* {{{
* trait MyType { // as an example
* def name: String
* }
*
* implicit val myLogSourceType: LogSource[MyType] = new LogSource {
* def genString(a: MyType) = a.name
* }
*
* class MyClass extends MyType {
* val log = Logging(eventStream, this) // will use "hallo" as logSource
* def name = "hallo"
* }
* }}}
*
* The second variant is used for including the actor systems address:
* {{{
* trait MyType { // as an example
* def name: String
* }
*
* implicit val myLogSourceType: LogSource[MyType] = new LogSource {
* def genString(a: MyType) = a.name
* def genString(a: MyType, s: ActorSystem) = a.name + "," + s
* }
*
* class MyClass extends MyType {
* val sys = ActorSyste("sys")
* val log = Logging(sys, this) // will use "hallo,akka://sys" as logSource
* def name = "hallo"
* }
* }}}
*
* The default implementation of the second variant will just call the first.
*/
trait LogSource[-T] { trait LogSource[-T] {
def genString(t: T): String def genString(t: T): String
def genString(t: T, system: ActorSystem): String = genString(t)
def getClazz(t: T): Class[_] = t.getClass
} }
/**
* This is a marker class which is inserted as originator class into
* [[akka.event.LogEvent]] when the string representation was supplied
* directly.
*/
class DummyClassForStringSources
/**
* This object holds predefined formatting rules for log sources.
*
* In case an [[akka.actor.ActorSystem]] is provided, the following apply:
* <ul>
* <li>[[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path</li>
* <li>providing a `String` as source will append "(<system address>)" and use the result</li>
* <li>providing a `Class` will extract its simple name, append "(<system address>)" and use the result</li>
* <li>anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it</li>
* </ul>
*
* In case a [[akka.event.LoggingBus]] is provided, the following apply:
* <ul>
* <li>[[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path</li>
* <li>providing a `String` as source will be used as is</li>
* <li>providing a `Class` will extract its simple name</li>
* <li>anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it</li>
* </ul>
*/
object LogSource { object LogSource {
implicit val fromString: LogSource[String] = new LogSource[String] { implicit val fromString: LogSource[String] = new LogSource[String] {
def genString(s: String) = s def genString(s: String) = s
override def genString(s: String, system: ActorSystem) = s + "(" + system + ")"
override def getClazz(s: String) = classOf[DummyClassForStringSources]
} }
implicit val fromActor: LogSource[Actor] = new LogSource[Actor] { implicit val fromActor: LogSource[Actor] = new LogSource[Actor] {
@ -191,18 +260,55 @@ object LogSource {
// this one unfortunately does not work as implicit, because existential types have some weird behavior // this one unfortunately does not work as implicit, because existential types have some weird behavior
val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] { val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] {
def genString(c: Class[_]) = simpleName(c) def genString(c: Class[_]) = simpleName(c)
override def genString(c: Class[_], system: ActorSystem) = simpleName(c) + "(" + system + ")"
override def getClazz(c: Class[_]) = c
} }
implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]] implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]]
def apply[T: LogSource](o: T) = implicitly[LogSource[T]].genString(o) /**
* Convenience converter access: given an implicit `LogSource`, generate the
* string representation and originating class.
*/
def apply[T: LogSource](o: T): (String, Class[_]) = {
val ls = implicitly[LogSource[T]]
(ls.genString(o), ls.getClazz(o))
}
def fromAnyRef(o: AnyRef): String = /**
* Convenience converter access: given an implicit `LogSource` and
* [[akka.actor.ActorSystem]], generate the string representation and
* originating class.
*/
def apply[T: LogSource](o: T, system: ActorSystem): (String, Class[_]) = {
val ls = implicitly[LogSource[T]]
(ls.genString(o, system), ls.getClazz(o))
}
/**
* construct string representation for any object according to
* rules above with fallback to its `Class`s simple name.
*/
def fromAnyRef(o: AnyRef): (String, Class[_]) =
o match { o match {
case c: Class[_] fromClass.genString(c) case c: Class[_] apply(c)
case a: Actor fromActor.genString(a) case a: Actor apply(a)
case a: ActorRef fromActorRef.genString(a) case a: ActorRef apply(a)
case s: String s case s: String apply(s)
case x simpleName(x) case x (simpleName(x), x.getClass)
}
/**
* construct string representation for any object according to
* rules above (including the actor systems address) with fallback to its
* `Class`s simple name.
*/
def fromAnyRef(o: AnyRef, system: ActorSystem): (String, Class[_]) =
o match {
case c: Class[_] apply(c)
case a: Actor apply(a)
case a: ActorRef apply(a)
case s: String apply(s)
case x (simpleName(x) + "(" + system + ")", x.getClass)
} }
} }
@ -218,6 +324,11 @@ object LogSource {
* log.info("hello world!") * log.info("hello world!")
* </code></pre> * </code></pre>
* *
* The source object is used in two fashions: its `Class[_]` will be part of
* all log events produced by this logger, plus a string representation is
* generated which may contain per-instance information, see `apply` or `create`
* below.
*
* Loggers are attached to the level-specific channels <code>Error</code>, * Loggers are attached to the level-specific channels <code>Error</code>,
* <code>Warning</code>, <code>Info</code> and <code>Debug</code> as * <code>Warning</code>, <code>Info</code> and <code>Debug</code> as
* appropriate for the configured (or set) log level. If you want to implement * appropriate for the configured (or set) log level. If you want to implement
@ -305,42 +416,80 @@ object Logging {
val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern
/** /**
* Obtain LoggingAdapter for the given event stream (system) and source object. * Obtain LoggingAdapter for the given actor system and source object. This
* Note that there is an implicit conversion from [[akka.actor.ActorSystem]] * will use the systems event stream and include the systems address in the
* to [[akka.event.LoggingBus]]. * log source string.
* *
* The source is used to identify the source of this logging channel and must have * <b>Do not use this if you want to supply a log category string (like
* a corresponding LogSource[T] instance in scope; by default these are * com.example.app.whatever) unaltered,</b> supply `system.eventStream` in this
* provided for Class[_], Actor, ActorRef and String types. The source * case or use
* object is translated to a String according to the following rules: *
* <ul> * {{{
* <li>if it is an Actor or ActorRef, its path is used</li> * Logging(system, this.getClass)
* <li>in case of a String it is used as is</li> * }}}
* <li>in case of a class an approximation of its simpleName *
* <li>and in all other cases the simpleName of its class</li> * The source is used to identify the source of this logging channel and
* </ul> * must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*
* You can add your own rules quite easily, see [[akka.event.LogSource]].
*/ */
def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter = def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = {
new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource)) val (str, clazz) = LogSource(logSource, system)
new BusLogging(system.eventStream, str, clazz)
}
/** /**
* Java API: Obtain LoggingAdapter for the given system and source object. The * Obtain LoggingAdapter for the given logging bus and source object.
* source object is used to identify the source of this logging channel. The source *
* object is translated to a String according to the following rules: * The source is used to identify the source of this logging channel and
* <ul> * must have a corresponding implicit LogSource[T] instance in scope; by
* <li>if it is an Actor or ActorRef, its path is used</li> * default these are provided for Class[_], Actor, ActorRef and String types.
* <li>in case of a String it is used as is</li> * See the companion object of [[akka.event.LogSource]] for details.
* <li>in case of a class an approximation of its simpleName *
* <li>and in all other cases the simpleName of its class</li> * You can add your own rules quite easily, see [[akka.event.LogSource]].
* </ul>
*/ */
def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource)) def apply[T: LogSource](bus: LoggingBus, logSource: T): LoggingAdapter = {
val (str, clazz) = LogSource(logSource)
new BusLogging(bus, str, clazz)
}
/** /**
* Java API: Obtain LoggingAdapter for the given event bus and source object. The * Obtain LoggingAdapter for the given actor system and source object. This
* source object is used to identify the source of this logging channel. * will use the systems event stream and include the systems address in the
* log source string.
*
* <b>Do not use this if you want to supply a log category string (like
* com.example.app.whatever) unaltered,</b> supply `system.eventStream` in this
* case or use
*
* {{{
* Logging.getLogger(system, this.getClass());
* }}}
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*/ */
def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = {
val (str, clazz) = LogSource.fromAnyRef(logSource, system)
new BusLogging(system.eventStream, str, clazz)
}
/**
* Obtain LoggingAdapter for the given logging bus and source object.
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*/
def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = {
val (str, clazz) = LogSource.fromAnyRef(logSource)
new BusLogging(bus, str, clazz)
}
/** /**
* Artificial exception injected into Error events if no Throwable is * Artificial exception injected into Error events if no Throwable is
@ -362,19 +511,34 @@ object Logging {
* The LogLevel of this LogEvent * The LogLevel of this LogEvent
*/ */
def level: LogLevel def level: LogLevel
/**
* The source of this event
*/
def logSource: String
/**
* The class of the source of this event
*/
def logClass: Class[_]
/**
* The message, may be any object or null.
*/
def message: Any
} }
/** /**
* For ERROR Logging * For ERROR Logging
*/ */
case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent { case class Error(cause: Throwable, logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
def this(logSource: String, message: Any) = this(Error.NoCause, logSource, message) def this(logSource: String, logClass: Class[_], message: Any) = this(Error.NoCause, logSource, logClass, message)
override def level = ErrorLevel override def level = ErrorLevel
} }
object Error { object Error {
def apply(logSource: String, message: Any) = new Error(NoCause, logSource, message) def apply(logSource: String, logClass: Class[_], message: Any) = new Error(NoCause, logSource, logClass, message)
/** Null Object used for errors without cause Throwable */ /** Null Object used for errors without cause Throwable */
object NoCause extends NoStackTrace object NoCause extends NoStackTrace
@ -383,21 +547,21 @@ object Logging {
/** /**
* For WARNING Logging * For WARNING Logging
*/ */
case class Warning(logSource: String, message: Any = "") extends LogEvent { case class Warning(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
override def level = WarningLevel override def level = WarningLevel
} }
/** /**
* For INFO Logging * For INFO Logging
*/ */
case class Info(logSource: String, message: Any = "") extends LogEvent { case class Info(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
override def level = InfoLevel override def level = InfoLevel
} }
/** /**
* For DEBUG Logging * For DEBUG Logging
*/ */
case class Debug(logSource: String, message: Any = "") extends LogEvent { case class Debug(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
override def level = DebugLevel override def level = DebugLevel
} }
@ -439,7 +603,7 @@ object Logging {
case e: Warning warning(e) case e: Warning warning(e)
case e: Info info(e) case e: Info info(e)
case e: Debug debug(e) case e: Debug debug(e)
case e warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e)) case e warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e))
} }
} }
@ -626,7 +790,7 @@ trait LoggingAdapter {
} }
} }
class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter { class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter {
import Logging._ import Logging._
@ -635,14 +799,14 @@ class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdap
def isInfoEnabled = bus.logLevel >= InfoLevel def isInfoEnabled = bus.logLevel >= InfoLevel
def isDebugEnabled = bus.logLevel >= DebugLevel def isDebugEnabled = bus.logLevel >= DebugLevel
protected def notifyError(message: String) { bus.publish(Error(logSource, message)) } protected def notifyError(message: String) { bus.publish(Error(logSource, logClass, message)) }
protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) } protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, logClass, message)) }
protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) } protected def notifyWarning(message: String) { bus.publish(Warning(logSource, logClass, message)) }
protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) } protected def notifyInfo(message: String) { bus.publish(Info(logSource, logClass, message)) }
protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) } protected def notifyDebug(message: String) { bus.publish(Debug(logSource, logClass, message)) }
} }

View file

@ -36,7 +36,8 @@ object LoggingReceive {
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = { def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o) val handled = r.isDefinedAt(o)
system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o)) val (str, clazz) = LogSource.fromAnyRef(source)
system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled handled
} }
def apply(o: Any): Unit = r(o) def apply(o: Any): Unit = r(o)

View file

@ -7,8 +7,10 @@ import akka.actor._
import akka.dispatch.Future import akka.dispatch.Future
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit
import akka.util.{ Duration, Timeout } import akka.util.{ Duration, Timeout }
import akka.util.duration._ import akka.util.duration._
import com.typesafe.config.Config
import akka.config.ConfigurationException import akka.config.ConfigurationException
import scala.collection.JavaConversions.iterableAsScalaIterable import scala.collection.JavaConversions.iterableAsScalaIterable
@ -286,12 +288,15 @@ object RoundRobinRouter {
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
* <br> * <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the round robin should both create new actors and use the 'routees' actor(s). * that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br> * <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that * <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RoundRobinLike { extends RouterConfig with RoundRobinLike {
@ -307,9 +312,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
def this(t: java.lang.Iterable[String]) = { def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(t)) this(routees = iterableAsScalaIterable(routeePaths))
} }
/** /**
@ -361,12 +368,15 @@ object RandomRouter {
* A Router that randomly selects one of the target connections to send a message to. * A Router that randomly selects one of the target connections to send a message to.
* <br> * <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s). * that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br> * <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that * <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with RandomLike { extends RouterConfig with RandomLike {
@ -382,9 +392,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
def this(t: java.lang.Iterable[String]) = { def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(t)) this(routees = iterableAsScalaIterable(routeePaths))
} }
/** /**
@ -424,6 +436,159 @@ trait RandomLike { this: RouterConfig ⇒
} }
} }
object SmallestMailboxRouter {
def apply(routees: Iterable[ActorRef]) = new SmallestMailboxRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = {
import scala.collection.JavaConverters._
apply(routees.asScala)
}
}
/**
* A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
* The selection is done in this order:
* <ul>
* <li>pick any idle routee (not processing message) with empty mailbox</li>
* <li>pick any routee with empty mailbox</li>
* <li>pick routee with fewest pending messages in mailbox</li>
* <li>pick any remote routee, remote actors are consider lowest priority,
* since their mailbox size is unknown</li>
* </ul>
*
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with SmallestMailboxLike {
/**
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int) = {
this(nrOfInstances = nr)
}
/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
}
trait SmallestMailboxLike { this: RouterConfig
import java.security.SecureRandom
def nrOfInstances: Int
def routees: Iterable[String]
private val random = new ThreadLocal[SecureRandom] {
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
}
/**
* Returns true if the actor is currently processing a message.
* It will always return false for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def isProcessingMessage(a: ActorRef): Boolean = a match {
case x: LocalActorRef
val cell = x.underlying
cell.mailbox.isScheduled && cell.currentMessage != null
case _ false
}
/**
* Returns true if the actor currently has any pending messages
* in the mailbox, i.e. the mailbox is not empty.
* It will always return false for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def hasMessages(a: ActorRef): Boolean = a match {
case x: LocalActorRef x.underlying.mailbox.hasMessages
case _ false
}
/**
* Returns true if the actor is currently suspended.
* It will always return false for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def isSuspended(a: ActorRef): Boolean = a match {
case x: LocalActorRef
val cell = x.underlying
cell.mailbox.isSuspended
case _ false
}
/**
* Returns the number of pending messages in the mailbox of the actor.
* It will always return 0 for remote actors.
* Method is exposed to subclasses to be able to implement custom
* routers based on mailbox and actor internal state.
*/
protected def numberOfMessages(a: ActorRef): Int = a match {
case x: LocalActorRef x.underlying.mailbox.numberOfMessages
case _ 0
}
def createRoute(props: Props, context: ActorContext): Route = {
val ref = context.self.asInstanceOf[RoutedActorRef]
createAndRegisterRoutees(props, context, nrOfInstances, routees)
def getNext(): ActorRef = {
// non-local actors mailbox size is unknown, so consider them lowest priority
val activeLocal = ref.routees collect { case l: LocalActorRef if !isSuspended(l) l }
// 1. anyone not processing message and with empty mailbox
activeLocal.find(a !isProcessingMessage(a) && !hasMessages(a)) getOrElse {
// 2. anyone with empty mailbox
activeLocal.find(a !hasMessages(a)) getOrElse {
// 3. sort on mailbox size
activeLocal.sortBy(a numberOfMessages(a)).headOption getOrElse {
// 4. no locals, just pick one, random
ref.routees(random.get.nextInt(ref.routees.size))
}
}
}
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, ref.routees)
case msg List(Destination(sender, getNext()))
}
}
}
}
object BroadcastRouter { object BroadcastRouter {
def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString)) def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString))
@ -439,12 +604,15 @@ object BroadcastRouter {
* A Router that uses broadcasts a message to all its connections. * A Router that uses broadcasts a message to all its connections.
* <br> * <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s). * that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br> * <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that * <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None)
extends RouterConfig with BroadcastLike { extends RouterConfig with BroadcastLike {
@ -460,9 +628,11 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
def this(t: java.lang.Iterable[String]) = { def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(t)) this(routees = iterableAsScalaIterable(routeePaths))
} }
/** /**
@ -507,12 +677,15 @@ object ScatterGatherFirstCompletedRouter {
* Simple router that broadcasts the message to all routees, and replies with the first response. * Simple router that broadcasts the message to all routees, and replies with the first response.
* <br> * <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s). * that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br> * <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that * <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the router is defined in the configuration file for the actor being used.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None) override val resizer: Option[Resizer] = None)
@ -529,9 +702,11 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/ */
def this(t: java.lang.Iterable[String], w: Duration) = { def this(routeePaths: java.lang.Iterable[String], w: Duration) = {
this(routees = iterableAsScalaIterable(t), within = w) this(routees = iterableAsScalaIterable(routeePaths), within = w)
} }
/** /**
@ -587,6 +762,19 @@ trait Resizer {
def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig)
} }
case object DefaultResizer {
def apply(resizerConfig: Config): DefaultResizer =
DefaultResizer(
lowerBound = resizerConfig.getInt("lower-bound"),
upperBound = resizerConfig.getInt("upper-bound"),
pressureThreshold = resizerConfig.getInt("pressure-threshold"),
rampupRate = resizerConfig.getDouble("rampup-rate"),
backoffThreshold = resizerConfig.getDouble("backoff-threshold"),
backoffRate = resizerConfig.getDouble("backoff-rate"),
stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS),
messagesPerResize = resizerConfig.getInt("messages-per-resize"))
}
case class DefaultResizer( case class DefaultResizer(
/** /**
* The fewest number of routees the router should ever have. * The fewest number of routees the router should ever have.

View file

@ -21,7 +21,7 @@ object JMX {
case e: InstanceAlreadyExistsException case e: InstanceAlreadyExistsException
Some(mbeanServer.getObjectInstance(name)) Some(mbeanServer.getObjectInstance(name))
case e: Exception case e: Exception
system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean))) system.eventStream.publish(Error(e, "JMX", this.getClass, "Error when registering mbean [%s]".format(mbean)))
None None
} }
@ -29,6 +29,6 @@ object JMX {
mbeanServer.unregisterMBean(mbean) mbeanServer.unregisterMBean(mbean)
} catch { } catch {
case e: InstanceNotFoundException {} case e: InstanceNotFoundException {}
case e: Exception system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean))) case e: Exception system.eventStream.publish(Error(e, "JMX", this.getClass, "Error while unregistering mbean [%s]".format(mbean)))
} }
} }

View file

@ -1,3 +1,5 @@
.. _jmm:
Akka and the Java Memory Model Akka and the Java Memory Model
================================ ================================

View file

@ -1,4 +1,3 @@
.. _deployment-scenarios: .. _deployment-scenarios:
################################### ###################################
@ -28,7 +27,7 @@ Actors as services
^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
The simplest way you can use Akka is to use the actors as services in your Web The simplest way you can use Akka is to use the actors as services in your Web
application. All thats needed to do that is to put the Akka charts as well as application. All thats needed to do that is to put the Akka jars as well as
its dependency jars into ``WEB-INF/lib``. You also need to put the :ref:`configuration` its dependency jars into ``WEB-INF/lib``. You also need to put the :ref:`configuration`
file in the ``$AKKA_HOME/config`` directory. Now you can create your file in the ``$AKKA_HOME/config`` directory. Now you can create your
Actors as regular services referenced from your Web application. You should also Actors as regular services referenced from your Web application. You should also

View file

@ -7,6 +7,7 @@ import akka.routing.ScatterGatherFirstCompletedRouter;
import akka.routing.BroadcastRouter; import akka.routing.BroadcastRouter;
import akka.routing.RandomRouter; import akka.routing.RandomRouter;
import akka.routing.RoundRobinRouter; import akka.routing.RoundRobinRouter;
import akka.routing.SmallestMailboxRouter;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
@ -34,6 +35,14 @@ public class ParentActor extends UntypedActor {
randomRouter.tell(i, getSelf()); randomRouter.tell(i, getSelf());
} }
//#randomRouter //#randomRouter
} else if (msg.equals("smr")) {
//#smallestMailboxRouter
ActorRef smallestMailboxRouter = getContext().actorOf(
new Props(PrintlnActor.class).withRouter(new SmallestMailboxRouter(5)), "router");
for (int i = 1; i <= 10; i++) {
smallestMailboxRouter.tell(i, getSelf());
}
//#smallestMailboxRouter
} else if (msg.equals("br")) { } else if (msg.equals("br")) {
//#broadcastRouter //#broadcastRouter
ActorRef broadcastRouter = getContext().actorOf(new Props(PrintlnActor.class).withRouter(new BroadcastRouter(5)), ActorRef broadcastRouter = getContext().actorOf(new Props(PrintlnActor.class).withRouter(new BroadcastRouter(5)),

View file

@ -17,8 +17,13 @@ as illustrated in this example:
.. includecode:: code/akka/docs/event/LoggingDocTestBase.java .. includecode:: code/akka/docs/event/LoggingDocTestBase.java
:include: imports,my-actor :include: imports,my-actor
The second parameter to the ``Logging.getLogger`` is the source of this logging channel. The first parameter to ``Logging.getLogger`` could also be any
The source object is translated to a String according to the following rules: :class:`LoggingBus`, specifically ``system.eventStream()``; in the demonstrated
case, the actor systems address is included in the ``akkaSource``
representation of the log source (see `Logging Thread and Akka Source in MDC`_)
while in the second case this is not automatically done. The second parameter
to ``Logging.getLogger`` is the source of this logging channel. The source
object is translated to a String according to the following rules:
* if it is an Actor or ActorRef, its path is used * if it is an Actor or ActorRef, its path is used
* in case of a String it is used as is * in case of a String it is used as is
@ -28,6 +33,13 @@ The source object is translated to a String according to the following rules:
The log message may contain argument placeholders ``{}``, which will be substituted if the log level The log message may contain argument placeholders ``{}``, which will be substituted if the log level
is enabled. is enabled.
The Java :class:`Class` of the log source is also included in the generated
:class:`LogEvent`. In case of a simple string this is replaced with a “marker”
class :class:`akka.event.DummyClassForStringSources` in order to allow special
treatment of this case, e.g. in the SLF4J event listener which will then use
the string instead of the class name for looking up the logger instance to
use.
Event Handler Event Handler
============= =============
@ -83,8 +95,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
loglevel = "DEBUG" loglevel = "DEBUG"
} }
Logging thread in MDC Logging Thread and Akka Source in MDC
--------------------- -------------------------------------
Since the logging is done asynchronously the thread in which the logging was performed is captured in Since the logging is done asynchronously the thread in which the logging was performed is captured in
Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``. Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``.
@ -96,3 +108,22 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi
</layout> </layout>
</appender> </appender>
.. note::
It will probably be a good idea to use the ``sourceThread`` MDC value also in
non-Akka parts of the application in order to have this property consistently
available in the logs.
Another helpful facility is that Akka captures the actors address when
instantiating a logger within it, meaning that the full instance identification
is available for associating log messages e.g. with members of a router. This
information is available in the MDC with attribute name ``akkaSource``::
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout>
<pattern>%date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
</layout>
</appender>
For more details on what this attribute contains—also for non-actors—please see
`How to Log`_.

View file

@ -16,11 +16,12 @@ Router
A Router is an actor that routes incoming messages to outbound actors. A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'. The router routes the messages sent to it to its underlying actors called 'routees'.
Akka comes with four defined routers out of the box, but as you will see in this chapter it Akka comes with some defined routers out of the box, but as you will see in this chapter it
is really easy to create your own. The four routers shipped with Akka are: is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.RoundRobinRouter`` * ``akka.routing.RoundRobinRouter``
* ``akka.routing.RandomRouter`` * ``akka.routing.RandomRouter``
* ``akka.routing.SmallestMailboxRouter``
* ``akka.routing.BroadcastRouter`` * ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter``
@ -44,9 +45,8 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it. When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in *It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
the configuration file then this value will be used instead of any programmatically sent parameters, but you must instead of any programmatically sent parameters.*
also define the ``router`` property in the configuration.*
Once you have the router actor it is just to send messages to it as you would to any actor: Once you have the router actor it is just to send messages to it as you would to any actor:
@ -122,6 +122,21 @@ When run you should see a similar output to this:
The result from running the random router should be different, or at least random, every time you run it. The result from running the random router should be different, or at least random, every time you run it.
Try to run it a couple of times to verify its behavior if you don't trust us. Try to run it a couple of times to verify its behavior if you don't trust us.
SmallestMailboxRouter
*********************
A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
The selection is done in this order:
* pick any idle routee (not processing message) with empty mailbox
* pick any routee with empty mailbox
* pick routee with fewest pending messages in mailbox
* pick any remote routee, remote actors are consider lowest priority,
since their mailbox size is unknown
Code example:
.. includecode:: code/akka/docs/jrouting/ParentActor.java#smallestMailboxRouter
BroadcastRouter BroadcastRouter
*************** ***************
A broadcast router forwards the message it receives to *all* its routees. A broadcast router forwards the message it receives to *all* its routees.
@ -241,6 +256,14 @@ If you are interested in how to use the VoteCountRouter it looks like this:
.. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest .. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest
Configured Custom Router
************************
It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment
configuration you define the fully qualified class name of the router class. The router class must extend
``akka.routing.CustomRouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter.
The deployment section of the configuration is passed to the constructor.
Custom Resizer Custom Resizer
************** **************

View file

@ -4,6 +4,10 @@
Migration Guide 1.3.x to 2.0.x Migration Guide 1.3.x to 2.0.x
################################ ################################
.. sidebar:: Contents
.. contents:: :local:
Actors Actors
====== ======
@ -13,9 +17,177 @@ significant amount of time.
Detailed migration guide will be written. Detailed migration guide will be written.
Migration Kit
=============
Nobody likes a big refactoring that takes several days to complete until
anything is able to run again. Therefore we provide a migration kit that
makes it possible to do the migration changes in smaller steps.
The migration kit only covers the most common usage of Akka. It is not intended
as a final solution. The whole migration kit is deprecated and will be removed in
Akka 2.1.
The migration kit is provided in separate jar files. Add the following dependency::
"com.typesafe.akka" % "akka-actor-migration" % "2.0-SNAPSHOT"
The first step of the migration is to do some trivial replacements.
Search and replace the following (be careful with the non qualified names):
==================================== ====================================
Search Replace with
==================================== ====================================
``akka.actor.Actor`` ``akka.actor.OldActor``
``extends Actor`` ``extends OldActor``
``akka.actor.Scheduler`` ``akka.actor.OldScheduler``
``Scheduler`` ``OldScheduler``
``akka.event.EventHandler`` ``akka.event.OldEventHandler``
``EventHandler`` ``OldEventHandler``
``akka.config.Config`` ``akka.config.OldConfig``
``Config`` ``OldConfig``
==================================== ====================================
For Scala users the migration kit also contains some implicit conversions to be
able to use some old methods. These conversions are useful from tests or other
code used outside actors.
::
import akka.migration._
Thereafter you need to fix compilation errors that are not handled by the migration
kit, such as:
* Definition of supervisors
* Definition of dispatchers
* ActorRegistry
When everything compiles you continue by replacing/removing the ``OldXxx`` classes
one-by-one from the migration kit with appropriate migration.
When using the migration kit there will be one global actor system, which loads
the configuration ``akka.conf`` from the same locations as in Akka 1.x.
This means that while you are using the migration kit you should not create your
own ``ActorSystem``, but instead use the ``akka.actor.GlobalActorSystem``.
In order to voluntarily exit the JVM you must ``shutdown`` the ``GlobalActorSystem``
Last task of the migration would be to create your own ``ActorSystem``.
Unordered Collection of Migration Items Unordered Collection of Migration Items
======================================= =======================================
Creating and starting actors
----------------------------
Actors are created by passing in a ``Props`` instance into the actorOf factory method in
a ``ActorRefProvider``, which is the ``ActorSystem`` or ``ActorContext``.
Use the system to create top level actors. Use the context to
create actors from other actors. The difference is how the supervisor hierarchy is arranged.
When using the context the current actor will be supervisor of the created child actor.
When using the system it will be a top level actor, that is supervised by the system
(internal guardian actor).
``ActorRef.start()`` has been removed. Actors are now started automatically when created.
Remove all invocations of ``ActorRef.start()``.
v1.3::
val myActor = Actor.actorOf[MyActor]
myActor.start()
v2.0::
// top level actor
val firstActor = system.actorOf(Props[FirstActor], name = "first")
// child actor
class FirstActor extends Actor {
val myActor = context.actorOf(Props[MyActor], name = "myactor")
Documentation:
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
Stopping actors
---------------
``ActorRef.stop()`` has been moved. Use ``ActorSystem`` or ``ActorContext`` to stop actors.
v1.3::
actorRef.stop()
self.stop()
actorRef ! PoisonPill
v2.0::
context.stop(someChild)
context.stop(self)
system.stop(actorRef)
actorRef ! PoisonPill
*Stop all actors*
v1.3::
ActorRegistry.shutdownAll()
v2.0::
system.shutdown()
Documentation:
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
Identifying Actors
------------------
In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``.
The ``ActorRegistry`` has been replaced by actor paths and lookup with
``actorFor`` in ``ActorRefProvider`` (``ActorSystem`` or ``ActorContext``).
v1.3::
val actor = Actor.registry.actorFor(uuid)
val actors = Actor.registry.actorsFor(id)
v2.0::
val actor = context.actorFor("/user/serviceA/aggregator")
Documentation:
* :ref:`addressing`
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
Reply to messages
-----------------
``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala)
or ``getSender()`` (Java). This works for both tell (!) and ask (?).
v1.3::
self.channel ! result
self.channel tryTell result
self.reply(result)
self.tryReply(result)
v2.0::
sender ! result
Documentation:
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
``ActorRef.ask()`` ``ActorRef.ask()``
------------------ ------------------
@ -28,7 +200,185 @@ reply to be received; it is independent of the timeout applied when awaiting
completion of the :class:`Future`, however, the actor will complete the completion of the :class:`Future`, however, the actor will complete the
:class:`Future` with an :class:`AskTimeoutException` when it stops itself. :class:`Future` with an :class:`AskTimeoutException` when it stops itself.
Documentation:
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
ActorPool ActorPool
--------- ---------
The ActorPool has been replaced by dynamically resizable routers. The ActorPool has been replaced by dynamically resizable routers.
Documentation:
* :ref:`routing-scala`
* :ref:`routing-java`
``UntypedActor.getContext()`` (Java API only)
---------------------------------------------
``getContext()`` in the Java API for UntypedActor is renamed to
``getSelf()``.
v1.3::
actorRef.tell("Hello", getContext());
v2.0::
actorRef.tell("Hello", getSelf());
Documentation:
* :ref:`untyped-actors-java`
Logging
-------
EventHandler API has been replaced by LoggingAdapter, which publish log messages
to the event bus. You can still plugin your own actor as event listener with the
``akka.event-handlers`` configuration property.
v1.3::
EventHandler.error(exception, this, message)
EventHandler.warning(this, message)
EventHandler.info(this, message)
EventHandler.debug(this, message)
EventHandler.debug(this, "Processing took %s ms".format(duration))
v2.0::
import akka.event.Logging
val log = Logging(context.system, this)
log.error(exception, this, message)
log.warning(this, message)
log.info(this, message)
log.debug(this, message)
log.debug(this, "Processing took {} ms", duration)
Documentation:
* :ref:`logging-scala`
* :ref:`logging-java`
* :ref:`event-bus-scala`
* :ref:`event-bus-java`
Supervision
-----------
Akka v2.0 implements parental supervision. Actors can only be created by other actors — where the top-level
actor is provided by the library — and each created actor is supervised by its parent.
In contrast to the special supervision relationship between parent and child, each actor may monitor any
other actor for termination.
v1.3::
self.link(actorRef)
self.unlink(actorRef)
v2.0::
class WatchActor extends Actor {
val actorRef = ...
// Terminated message will be delivered when the actorRef actor
// is stopped
context.watch(actorRef)
val supervisedChild = context.actorOf(Props[ChildActor])
def receive = {
case Terminated(`actorRef`) ⇒ ...
}
}
Note that ``link`` in v1.3 established a supervision relation, which ``watch`` doesn't.
``watch`` is only a way to get notification, ``Terminated`` message, when the monitored
actor has been stopped.
*Refererence to the supervisor*
v1.3::
self.supervisor
v2.0::
context.parent
*Fault handling strategy*
v1.3::
val supervisor = Supervisor(
SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(
actorOf[MyActor1],
Permanent) ::
Supervise(
actorOf[MyActor2],
Permanent) ::
Nil))
v2.0::
val strategy = OneForOneStrategy({
case _: ArithmeticException ⇒ Resume
case _: NullPointerException ⇒ Restart
case _: IllegalArgumentException ⇒ Stop
case _: Exception ⇒ Escalate
}: Decider, maxNrOfRetries = Some(10), withinTimeRange = Some(60000))
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(strategy), "supervisor")
Documentation:
* :ref:`supervision`
* :ref:`fault-tolerance-java`
* :ref:`fault-tolerance-scala`
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
Spawn
-----
``spawn`` has been removed and can be implemented like this, if needed. Be careful to not
access any shared mutable state closed over by the body.
::
def spawn(body: ⇒ Unit) {
system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) })) ! "go"
}
Documentation:
* :ref:`jmm`
HotSwap
-------
In v2.0 ``become`` and ``unbecome`` metods are located in ``ActorContext``, i.e. ``context.become`` and ``context.unbecome``.
The special ``HotSwap`` and ``RevertHotswap`` messages in v1.3 has been removed. Similar can be
implemented with your own message and using ``context.become`` and ``context.unbecome``
in the actor receiving the message.
* :ref:`actors-scala`
* :ref:`untyped-actors-java`
More to be written
------------------
* Futures
* Dispatchers
* STM
* TypedActors
* Routing
* Remoting
* Scheduler
* Configuration
* ...?

View file

@ -7,6 +7,9 @@ package akka.docs.actor
import akka.actor.Actor import akka.actor.Actor
import akka.actor.Props import akka.actor.Props
import akka.event.Logging import akka.event.Logging
//#imports1
import akka.dispatch.Future import akka.dispatch.Future
import akka.actor.ActorSystem import akka.actor.ActorSystem
import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.{ BeforeAndAfterAll, WordSpec }
@ -162,10 +165,10 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
system.eventStream.subscribe(testActor, classOf[Logging.Info]) system.eventStream.subscribe(testActor, classOf[Logging.Info])
myActor ! "test" myActor ! "test"
expectMsgPF(1 second) { case Logging.Info(_, "received test") true } expectMsgPF(1 second) { case Logging.Info(_, _, "received test") true }
myActor ! "unknown" myActor ! "unknown"
expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") true } expectMsgPF(1 second) { case Logging.Info(_, _, "received unknown message") true }
system.eventStream.unsubscribe(testActor) system.eventStream.unsubscribe(testActor)
system.eventStream.publish(TestEvent.UnMute(filter)) system.eventStream.publish(TestEvent.UnMute(filter))

View file

@ -38,15 +38,33 @@ object LoggingDocSpec {
class MyEventListener extends Actor { class MyEventListener extends Actor {
def receive = { def receive = {
case InitializeLogger(_) sender ! LoggerInitialized case InitializeLogger(_) sender ! LoggerInitialized
case Error(cause, logSource, message) // ... case Error(cause, logSource, logClass, message) // ...
case Warning(logSource, message) // ... case Warning(logSource, logClass, message) // ...
case Info(logSource, message) // ... case Info(logSource, logClass, message) // ...
case Debug(logSource, message) // ... case Debug(logSource, logClass, message) // ...
} }
} }
//#my-event-listener //#my-event-listener
//#my-source
import akka.event.LogSource
import akka.actor.ActorSystem
object MyType {
implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
}
class MyType(system: ActorSystem) {
import MyType._
import akka.event.Logging
val log = Logging(system, this)
}
//#my-source
} }
class LoggingDocSpec extends AkkaSpec { class LoggingDocSpec extends AkkaSpec {

View file

@ -8,6 +8,7 @@ import annotation.tailrec
import akka.actor.{ Props, Actor } import akka.actor.{ Props, Actor }
import akka.util.duration._ import akka.util.duration._
import akka.dispatch.Await import akka.dispatch.Await
import akka.routing.SmallestMailboxRouter
case class FibonacciNumber(nbr: Int) case class FibonacciNumber(nbr: Int)
@ -59,6 +60,14 @@ class ParentActor extends Actor {
i randomRouter ! i i randomRouter ! i
} }
//#randomRouter //#randomRouter
case "smr"
//#smallestMailboxRouter
val smallestMailboxRouter =
context.actorOf(Props[PrintlnActor].withRouter(SmallestMailboxRouter(5)), "router")
1 to 10 foreach {
i smallestMailboxRouter ! i
}
//#smallestMailboxRouter
case "br" case "br"
//#broadcastRouter //#broadcastRouter
val broadcastRouter = val broadcastRouter =

View file

@ -22,6 +22,8 @@ For convenience you can mixin the ``log`` member into actors, instead of definin
.. code-block:: scala .. code-block:: scala
class MyActor extends Actor with akka.actor.ActorLogging { class MyActor extends Actor with akka.actor.ActorLogging {
...
}
The second parameter to the ``Logging`` is the source of this logging channel. The second parameter to the ``Logging`` is the source of this logging channel.
The source object is translated to a String according to the following rules: The source object is translated to a String according to the following rules:
@ -29,17 +31,46 @@ The source object is translated to a String according to the following rules:
* if it is an Actor or ActorRef, its path is used * if it is an Actor or ActorRef, its path is used
* in case of a String it is used as is * in case of a String it is used as is
* in case of a class an approximation of its simpleName * in case of a class an approximation of its simpleName
* and in all other cases the simpleName of its class * and in all other cases a compile error occurs unless and implicit
:class:`LogSource[T]` is in scope for the type in question.
The log message may contain argument placeholders ``{}``, which will be substituted if the log level The log message may contain argument placeholders ``{}``, which will be substituted if the log level
is enabled. is enabled.
Translating Log Source to String and Class
------------------------------------------
The rules for translating the source object to the source string and class
which are inserted into the :class:`LogEvent` during runtime are implemented
using implicit parameters and thus fully customizable: simply create your own
instance of :class:`LogSource[T]` and have it in scope when creating the
logger.
.. includecode:: code/akka/docs/event/LoggingDocSpec.scala#my-source
This example creates a log source which mimics traditional usage of Java
loggers, which are based upon the originating objects class name as log
category. The override of :meth:`getClazz` is only included for demonstration
purposes as it contains exactly the default behavior.
.. note::
You may also create the string representation up front and pass that in as
the log source, but be aware that then the :class:`Class[_]` which will be
put in the :class:`LogEvent` is
:class:`akka.event.DummyClassForStringSources`.
The SLF4J event listener treats this case specially (using the actual string
to look up the logger instance to use instead of the class name), and you
might want to do this also in case you implement your own loggin adapter.
Event Handler Event Handler
============= =============
Logging is performed asynchronously through an event bus. You can configure which event handlers that should Logging is performed asynchronously through an event bus. You can configure
subscribe to the logging events. That is done using the 'event-handlers' element in the :ref:`configuration`. which event handlers that should subscribe to the logging events. That is done
Here you can also define the log level. using the ``event-handlers`` element in the :ref:`configuration`. Here you can
also define the log level.
.. code-block:: ruby .. code-block:: ruby
@ -50,7 +81,8 @@ Here you can also define the log level.
loglevel = "DEBUG" loglevel = "DEBUG"
} }
The default one logs to STDOUT and is registered by default. It is not intended to be used for production. There is also an :ref:`slf4j-scala` The default one logs to STDOUT and is registered by default. It is not intended
to be used for production. There is also an :ref:`slf4j-scala`
event handler available in the 'akka-slf4j' module. event handler available in the 'akka-slf4j' module.
Example of creating a listener: Example of creating a listener:
@ -58,7 +90,6 @@ Example of creating a listener:
.. includecode:: code/akka/docs/event/LoggingDocSpec.scala .. includecode:: code/akka/docs/event/LoggingDocSpec.scala
:include: my-event-listener :include: my-event-listener
.. _slf4j-scala: .. _slf4j-scala:
SLF4J SLF4J
@ -85,8 +116,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
loglevel = "DEBUG" loglevel = "DEBUG"
} }
Logging thread in MDC Logging Thread and Akka Source in MDC
--------------------- -------------------------------------
Since the logging is done asynchronously the thread in which the logging was performed is captured in Since the logging is done asynchronously the thread in which the logging was performed is captured in
Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``. Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``.
@ -98,3 +129,22 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi
</layout> </layout>
</appender> </appender>
.. note::
It will probably be a good idea to use the ``sourceThread`` MDC value also in
non-Akka parts of the application in order to have this property consistently
available in the logs.
Another helpful facility is that Akka captures the actors address when
instantiating a logger within it, meaning that the full instance identification
is available for associating log messages e.g. with members of a router. This
information is available in the MDC with attribute name ``akkaSource``::
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout>
<pattern>%date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
</layout>
</appender>
For more details on what this attribute contains—also for non-actors—please see
`How to Log`_.

View file

@ -16,11 +16,12 @@ Router
A Router is an actor that routes incoming messages to outbound actors. A Router is an actor that routes incoming messages to outbound actors.
The router routes the messages sent to it to its underlying actors called 'routees'. The router routes the messages sent to it to its underlying actors called 'routees'.
Akka comes with four defined routers out of the box, but as you will see in this chapter it Akka comes with some defined routers out of the box, but as you will see in this chapter it
is really easy to create your own. The four routers shipped with Akka are: is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.RoundRobinRouter`` * ``akka.routing.RoundRobinRouter``
* ``akka.routing.RandomRouter`` * ``akka.routing.RandomRouter``
* ``akka.routing.SmallestMailboxRouter``
* ``akka.routing.BroadcastRouter`` * ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter``
@ -44,9 +45,8 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it. When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
*It is also worth pointing out that if you define the number of routees (``nr-of-instances`` or ``routees``) in *It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
the configuration file then this value will be used instead of any programmatically sent parameters, but you must instead of any programmatically sent parameters.*
also define the ``router`` property in the configuration.*
Once you have the router actor it is just to send messages to it as you would to any actor: Once you have the router actor it is just to send messages to it as you would to any actor:
@ -123,6 +123,21 @@ When run you should see a similar output to this:
The result from running the random router should be different, or at least random, every time you run it. The result from running the random router should be different, or at least random, every time you run it.
Try to run it a couple of times to verify its behavior if you don't trust us. Try to run it a couple of times to verify its behavior if you don't trust us.
SmallestMailboxRouter
*********************
A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
The selection is done in this order:
* pick any idle routee (not processing message) with empty mailbox
* pick any routee with empty mailbox
* pick routee with fewest pending messages in mailbox
* pick any remote routee, remote actors are consider lowest priority,
since their mailbox size is unknown
Code example:
.. includecode:: code/akka/docs/routing/RouterTypeExample.scala#smallestMailboxRouter
BroadcastRouter BroadcastRouter
*************** ***************
A broadcast router forwards the message it receives to *all* its routees. A broadcast router forwards the message it receives to *all* its routees.
@ -240,6 +255,14 @@ All in all the custom router looks like this:
If you are interested in how to use the VoteCountRouter you can have a look at the test class If you are interested in how to use the VoteCountRouter you can have a look at the test class
`RoutingSpec <https://github.com/jboner/akka/blob/master/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala>`_ `RoutingSpec <https://github.com/jboner/akka/blob/master/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala>`_
Configured Custom Router
************************
It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment
configuration you define the fully qualified class name of the router class. The router class must extend
``akka.routing.RouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter.
The deployment section of the configuration is passed to the constructor.
Custom Resizer Custom Resizer
************** **************

View file

@ -153,7 +153,7 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
case message: DaemonMsg case message: DaemonMsg
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName) log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName)
message match { message match {
case DaemonMsgCreate(factory, path, supervisor) case DaemonMsgCreate(factory, path, supervisor)
import remote.remoteAddress import remote.remoteAddress

View file

@ -27,8 +27,6 @@ class RemoteActorRefProvider(
val scheduler: Scheduler, val scheduler: Scheduler,
_deadLetters: InternalActorRef) extends ActorRefProvider { _deadLetters: InternalActorRef) extends ActorRefProvider {
val log = Logging(eventStream, "RemoteActorRefProvider")
val remoteSettings = new RemoteSettings(settings.config, systemName) val remoteSettings = new RemoteSettings(settings.config, systemName)
def rootGuardian = local.rootGuardian def rootGuardian = local.rootGuardian
@ -44,6 +42,8 @@ class RemoteActorRefProvider(
val remote = new Remote(settings, remoteSettings) val remote = new Remote(settings, remoteSettings)
implicit val transports = remote.transports implicit val transports = remote.transports
val log = Logging(eventStream, "RemoteActorRefProvider(" + remote.remoteAddress + ")")
val rootPath: ActorPath = RootActorPath(remote.remoteAddress) val rootPath: ActorPath = RootActorPath(remote.remoteAddress)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer)

View file

@ -28,6 +28,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings
val r = deploy.routing match { val r = deploy.routing match {
case RoundRobinRouter(x, _, resizer) RemoteRoundRobinRouter(x, nodes, resizer) case RoundRobinRouter(x, _, resizer) RemoteRoundRobinRouter(x, nodes, resizer)
case RandomRouter(x, _, resizer) RemoteRandomRouter(x, nodes, resizer) case RandomRouter(x, _, resizer) RemoteRandomRouter(x, nodes, resizer)
case SmallestMailboxRouter(x, _, resizer) RemoteSmallestMailboxRouter(x, nodes, resizer)
case BroadcastRouter(x, _, resizer) RemoteBroadcastRouter(x, nodes, resizer) case BroadcastRouter(x, _, resizer) RemoteBroadcastRouter(x, nodes, resizer)
case ScatterGatherFirstCompletedRouter(x, _, w, resizer) RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer) case ScatterGatherFirstCompletedRouter(x, _, w, resizer) RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer)
} }

View file

@ -60,7 +60,7 @@ abstract class RemoteClient private[akka] (
* Converts the message to the wireprotocol and sends the message across the wire * Converts the message to the wireprotocol and sends the message across the wire
*/ */
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
log.debug("Sending message: {}", message) log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
send((message, senderOption, recipient)) send((message, senderOption, recipient))
} else { } else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)

View file

@ -82,6 +82,33 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], ove
def this(resizer: Resizer) = this(0, Nil, Some(resizer)) def this(resizer: Resizer) = this(0, Nil, Some(resizer))
} }
/**
* A Router that tries to send to routee with fewest messages in mailbox.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the random router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/
case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None)
extends RemoteRouterConfig with SmallestMailboxLike {
/**
* Constructor that sets the routees to be used.
* Java API
*/
def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(0, Nil, Some(resizer))
}
/** /**
* A Router that uses broadcasts a message to all its connections. * A Router that uses broadcasts a message to all its connections.
* <br> * <br>

View file

@ -8,6 +8,7 @@ import org.slf4j.{ Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory }
import org.slf4j.MDC import org.slf4j.MDC
import akka.event.Logging._ import akka.event.Logging._
import akka.actor._ import akka.actor._
import akka.event.DummyClassForStringSources
/** /**
* Base trait for all classes that wants to be able use the SLF4J logging infrastructure. * Base trait for all classes that wants to be able use the SLF4J logging infrastructure.
@ -19,6 +20,10 @@ trait SLF4JLogging {
object Logger { object Logger {
def apply(logger: String): SLFLogger = SLFLoggerFactory getLogger logger def apply(logger: String): SLFLogger = SLFLoggerFactory getLogger logger
def apply(logClass: Class[_], logSource: String): SLFLogger = logClass match {
case c if c == classOf[DummyClassForStringSources] apply(logSource)
case _ SLFLoggerFactory getLogger logClass
}
def root: SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME) def root: SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME)
} }
@ -31,30 +36,31 @@ object Logger {
class Slf4jEventHandler extends Actor with SLF4JLogging { class Slf4jEventHandler extends Actor with SLF4JLogging {
val mdcThreadAttributeName = "sourceThread" val mdcThreadAttributeName = "sourceThread"
val mdcAkkaSourceAttributeName = "akkaSource"
def receive = { def receive = {
case event @ Error(cause, logSource, message) case event @ Error(cause, logSource, logClass, message)
withMdc(mdcThreadAttributeName, event.thread.getName) { withMdc(logSource, event.thread.getName) {
cause match { cause match {
case Error.NoCause Logger(logSource).error(message.toString) case Error.NoCause Logger(logClass, logSource).error(message.toString)
case _ Logger(logSource).error(message.toString, cause) case _ Logger(logClass, logSource).error(message.toString, cause)
} }
} }
case event @ Warning(logSource, message) case event @ Warning(logSource, logClass, message)
withMdc(mdcThreadAttributeName, event.thread.getName) { withMdc(logSource, event.thread.getName) {
Logger(logSource).warn("{}", message.asInstanceOf[AnyRef]) Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef])
} }
case event @ Info(logSource, message) case event @ Info(logSource, logClass, message)
withMdc(mdcThreadAttributeName, event.thread.getName) { withMdc(logSource, event.thread.getName) {
Logger(logSource).info("{}", message.asInstanceOf[AnyRef]) Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef])
} }
case event @ Debug(logSource, message) case event @ Debug(logSource, logClass, message)
withMdc(mdcThreadAttributeName, event.thread.getName) { withMdc(logSource, event.thread.getName) {
Logger(logSource).debug("{}", message.asInstanceOf[AnyRef]) Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef])
} }
case InitializeLogger(_) case InitializeLogger(_)
@ -63,12 +69,14 @@ class Slf4jEventHandler extends Actor with SLF4JLogging {
} }
@inline @inline
final def withMdc(name: String, value: String)(logStatement: Unit) { final def withMdc(logSource: String, thread: String)(logStatement: Unit) {
MDC.put(name, value) MDC.put(mdcAkkaSourceAttributeName, logSource)
MDC.put(mdcThreadAttributeName, thread)
try { try {
logStatement logStatement
} finally { } finally {
MDC.remove(name) MDC.remove(mdcAkkaSourceAttributeName)
MDC.remove(mdcThreadAttributeName)
} }
} }

View file

@ -254,7 +254,7 @@ case class ErrorFilter(
def matches(event: LogEvent) = { def matches(event: LogEvent) = {
event match { event match {
case Error(cause, src, msg) if throwable isInstance cause case Error(cause, src, _, msg) if throwable isInstance cause
(msg == null && cause.getMessage == null && cause.getStackTrace.length == 0) || (msg == null && cause.getMessage == null && cause.getStackTrace.length == 0) ||
doMatch(src, msg) || doMatch(src, cause.getMessage) doMatch(src, msg) || doMatch(src, cause.getMessage)
case _ false case _ false
@ -305,8 +305,8 @@ case class WarningFilter(
def matches(event: LogEvent) = { def matches(event: LogEvent) = {
event match { event match {
case Warning(src, msg) doMatch(src, msg) case Warning(src, _, msg) doMatch(src, msg)
case _ false case _ false
} }
} }
@ -348,8 +348,8 @@ case class InfoFilter(
def matches(event: LogEvent) = { def matches(event: LogEvent) = {
event match { event match {
case Info(src, msg) doMatch(src, msg) case Info(src, _, msg) doMatch(src, msg)
case _ false case _ false
} }
} }
@ -391,8 +391,8 @@ case class DebugFilter(
def matches(event: LogEvent) = { def matches(event: LogEvent) = {
event match { event match {
case Debug(src, msg) doMatch(src, msg) case Debug(src, _, msg) doMatch(src, msg)
case _ false case _ false
} }
} }
@ -456,15 +456,15 @@ class TestEventListener extends Logging.DefaultLogger {
case event: LogEvent if (!filter(event)) print(event) case event: LogEvent if (!filter(event)) print(event)
case DeadLetter(msg: SystemMessage, _, rcp) case DeadLetter(msg: SystemMessage, _, rcp)
if (!msg.isInstanceOf[Terminate]) { if (!msg.isInstanceOf[Terminate]) {
val event = Warning(rcp.path.toString, "received dead system message: " + msg) val event = Warning(rcp.path.toString, rcp.getClass, "received dead system message: " + msg)
if (!filter(event)) print(event) if (!filter(event)) print(event)
} }
case DeadLetter(msg, snd, rcp) case DeadLetter(msg, snd, rcp)
if (!msg.isInstanceOf[Terminated]) { if (!msg.isInstanceOf[Terminated]) {
val event = Warning(rcp.path.toString, "received dead letter from " + snd + ": " + msg) val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg)
if (!filter(event)) print(event) if (!filter(event)) print(event)
} }
case m print(Debug(context.system.name, m)) case m print(Debug(context.system.name, this.getClass, m))
} }
def filter(event: LogEvent): Boolean = filters exists (f try { f(event) } catch { case e: Exception false }) def filter(event: LogEvent): Boolean = filters exists (f try { f(event) } catch { case e: Exception false })

View file

@ -81,7 +81,7 @@ object TestActorRefSpec {
var count = 0 var count = 0
var msg: String = _ var msg: String = _
def receive = { def receive = {
case Warning(_, m: String) count += 1; msg = m case Warning(_, _, m: String) count += 1; msg = m
} }
} }

View file

@ -31,7 +31,7 @@ object AkkaBuild extends Build {
Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id) Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id)
), ),
aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, transactor, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs) aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, transactor, mailboxes, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs)
) )
lazy val actor = Project( lazy val actor = Project(
@ -213,6 +213,13 @@ object AkkaBuild extends Build {
) )
) )
lazy val actorMigration = Project(
id = "akka-actor-migration",
base = file("akka-actor-migration"),
dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings
)
lazy val akkaSbtPlugin = Project( lazy val akkaSbtPlugin = Project(
id = "akka-sbt-plugin", id = "akka-sbt-plugin",
base = file("akka-sbt-plugin"), base = file("akka-sbt-plugin"),
@ -315,6 +322,8 @@ object AkkaBuild extends Build {
if (true || (System getProperty "java.runtime.version" startsWith "1.7")) Seq() else Seq("-optimize")), // -optimize fails with jdk7 if (true || (System getProperty "java.runtime.version" startsWith "1.7")) Seq() else Seq("-optimize")), // -optimize fails with jdk7
javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"), javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet,
parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean, parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean,
// for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec) // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)