From 6a12fb7876b38e0235a7a185abb8149de2ac4c26 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 Jan 2012 13:56:38 +0100 Subject: [PATCH 01/11] Implemented SmallestMailboxRouter. See #1619 --- .../test/scala/akka/routing/RoutingSpec.scala | 51 ++++ akka-actor/src/main/resources/reference.conf | 2 +- .../src/main/scala/akka/actor/Deployer.scala | 13 +- .../src/main/scala/akka/routing/Routing.scala | 225 ++++++++++++++---- .../code/akka/docs/jrouting/ParentActor.java | 9 + akka-docs/java/routing.rst | 20 +- .../akka/docs/routing/RouterTypeExample.scala | 9 + akka-docs/scala/routing.rst | 20 +- .../scala/akka/remote/RemoteDeployer.scala | 1 + .../scala/akka/routing/RemoteRouters.scala | 27 +++ 10 files changed, 322 insertions(+), 55 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 12fada0880..9811313688 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -12,6 +12,7 @@ import akka.dispatch.Await import akka.util.Duration import akka.config.ConfigurationException import com.typesafe.config.ConfigFactory +import java.util.concurrent.ConcurrentHashMap object RoutingSpec { @@ -256,6 +257,56 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } } + "smallest mailbox router" must { + "be started when constructed" in { + val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) + } + + "deliver messages to idle actor" in { + val usedActors = new ConcurrentHashMap[Int, String]() + val router = system.actorOf(Props(new Actor { + def receive = { + case busy: TestLatch ⇒ + usedActors.put(0, self.path.toString) + Await.ready(busy, TestLatch.DefaultTimeout) + case (msg: Int, receivedLatch: TestLatch) ⇒ + usedActors.put(msg, self.path.toString) + receivedLatch.countDown() + } + }).withRouter(SmallestMailboxRouter(3))) + + val busy = TestLatch(1) + router ! busy + + val received1 = TestLatch(1) + router.!((1, received1)) + Await.ready(received1, TestLatch.DefaultTimeout) + + val received2 = TestLatch(1) + router.!((2, received2)) + Await.ready(received2, TestLatch.DefaultTimeout) + + val received3 = TestLatch(1) + router.!((3, received3)) + Await.ready(received3, TestLatch.DefaultTimeout) + + busy.countDown() + + val busyPath = usedActors.get(0) + busyPath must not be (null) + + val path1 = usedActors.get(1) + val path2 = usedActors.get(2) + val path3 = usedActors.get(3) + + path1 must not be (busyPath) + path2 must not be (busyPath) + path3 must not be (busyPath) + + } + } + "broadcast router" must { "be started when constructed" in { val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1))) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 882fccea55..07e363fca9 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -64,7 +64,7 @@ akka { default { # routing (load-balance) scheme to use - # available: "from-code", "round-robin", "random", "scatter-gather", "broadcast" + # available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast" # or: fully qualified class name of the router class # default is "from-code"; # Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter). diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 35ec05432a..23c6da6661 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -71,12 +71,13 @@ class Deployer(val settings: ActorSystem.Settings) { } val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) } val recipe: Option[ActorRecipe] = diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 69589ae651..f021eb867a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -286,7 +286,7 @@ object RoundRobinRouter { * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the round robin should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -361,7 +361,7 @@ object RandomRouter { * A Router that randomly selects one of the target connections to send a message to. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -424,6 +424,143 @@ trait RandomLike { this: RouterConfig ⇒ } } +object SmallestMailboxRouter { + def apply(routees: Iterable[ActorRef]) = new SmallestMailboxRouter(routees = routees map (_.path.toString)) + + /** + * Java API to create router with the supplied 'routees' actors. + */ + def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter = { + import scala.collection.JavaConverters._ + apply(routees.asScala) + } +} +/** + * A Router that tries to send to the routee with fewest messages in mailbox. + * The selection is done in this order: + * + * + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) + extends RouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the routees to be used. + * Java API + */ + def this(t: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(t)) + } + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(resizer = Some(resizer)) +} + +trait SmallestMailboxLike { this: RouterConfig ⇒ + + import java.security.SecureRandom + + def nrOfInstances: Int + + def routees: Iterable[String] + + private val random = new ThreadLocal[SecureRandom] { + override def initialValue = SecureRandom.getInstance("SHA1PRNG") + } + + /** + * Returns true if the actor is currently processing a message. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def isProcessingMessage(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ + val cell = x.underlying + cell.mailbox.isScheduled && cell.currentMessage != null + case _ ⇒ false + } + + /** + * Returns true if the actor currently has any pending messages + * in the mailbox, i.e. the mailbox is not empty. + * It will always return false for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def hasMessages(a: ActorRef): Boolean = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages + case _ ⇒ false + } + + /** + * Returns the number of pending messages in the mailbox of the actor. + * It will always return 0 for remote actors. + * Method is exposed to subclasses to be able to implement custom + * routers based on mailbox and actor internal state. + */ + protected def numberOfMessages(a: ActorRef): Int = a match { + case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages + case _ ⇒ 0 + } + + def createRoute(props: Props, context: ActorContext): Route = { + val ref = context.self.asInstanceOf[RoutedActorRef] + createAndRegisterRoutees(props, context, nrOfInstances, routees) + + def getNext(): ActorRef = { + // non-local actors mailbox size is unknown, so consider them lowest priority + val local: IndexedSeq[LocalActorRef] = for (a ← ref.routees if a.isInstanceOf[LocalActorRef]) yield a.asInstanceOf[LocalActorRef] + // anyone not processing message and with empty mailbox + val idle = local.find(a ⇒ !isProcessingMessage(a) && !hasMessages(a)) + idle getOrElse { + // anyone with empty mailbox + val emptyMailbox = local.find(a ⇒ !hasMessages(a)) + emptyMailbox getOrElse { + // sort on mailbox size + local.sortBy(a ⇒ numberOfMessages(a)).headOption getOrElse { + // no locals, just pick one, random + ref.routees(random.get.nextInt(ref.routees.size)) + } + } + } + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, ref.routees) + case msg ⇒ List(Destination(sender, getNext())) + } + } + } +} + object BroadcastRouter { def apply(routees: Iterable[ActorRef]) = new BroadcastRouter(routees = routees map (_.path.toString)) @@ -439,7 +576,7 @@ object BroadcastRouter { * A Router that uses broadcasts a message to all its connections. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -507,7 +644,7 @@ object ScatterGatherFirstCompletedRouter { * Simple router that broadcasts the message to all routees, and replies with the first response. *
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means - * that the random router should both create new actors and use the 'routees' actor(s). + * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that @@ -515,7 +652,7 @@ object ScatterGatherFirstCompletedRouter { * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, - override val resizer: Option[Resizer] = None) + override val resizer: Option[Resizer] = None) extends RouterConfig with ScatterGatherFirstCompletedLike { /** @@ -593,57 +730,57 @@ case class DefaultResizer( */ lowerBound: Int = 1, /** - * The most number of routees the router should ever have. - * Must be greater than or equal to `lowerBound`. - */ + * The most number of routees the router should ever have. + * Must be greater than or equal to `lowerBound`. + */ upperBound: Int = 10, /** - * Threshold to evaluate if routee is considered to be busy (under pressure). - * Implementation depends on this value (default is 1). - * - */ + * Threshold to evaluate if routee is considered to be busy (under pressure). + * Implementation depends on this value (default is 1). + * + */ pressureThreshold: Int = 1, /** - * Percentage to increase capacity whenever all routees are busy. - * For example, 0.2 would increase 20% (rounded up), i.e. if current - * capacity is 6 it will request an increase of 2 more routees. - */ + * Percentage to increase capacity whenever all routees are busy. + * For example, 0.2 would increase 20% (rounded up), i.e. if current + * capacity is 6 it will request an increase of 2 more routees. + */ rampupRate: Double = 0.2, /** - * Minimum fraction of busy routees before backing off. - * For example, if this is 0.3, then we'll remove some routees only when - * less than 30% of routees are busy, i.e. if current capacity is 10 and - * 3 are busy then the capacity is unchanged, but if 2 or less are busy - * the capacity is decreased. - * - * Use 0.0 or negative to avoid removal of routees. - */ + * Minimum fraction of busy routees before backing off. + * For example, if this is 0.3, then we'll remove some routees only when + * less than 30% of routees are busy, i.e. if current capacity is 10 and + * 3 are busy then the capacity is unchanged, but if 2 or less are busy + * the capacity is decreased. + * + * Use 0.0 or negative to avoid removal of routees. + */ backoffThreshold: Double = 0.3, /** - * Fraction of routees to be removed when the resizer reaches the - * backoffThreshold. - * For example, 0.1 would decrease 10% (rounded up), i.e. if current - * capacity is 9 it will request an decrease of 1 routee. - */ + * Fraction of routees to be removed when the resizer reaches the + * backoffThreshold. + * For example, 0.1 would decrease 10% (rounded up), i.e. if current + * capacity is 9 it will request an decrease of 1 routee. + */ backoffRate: Double = 0.1, /** - * When the resizer reduce the capacity the abandoned routee actors are stopped - * with PoisonPill after this delay. The reason for the delay is to give concurrent - * messages a chance to be placed in mailbox before sending PoisonPill. - * Use 0 seconds to skip delay. - */ + * When the resizer reduce the capacity the abandoned routee actors are stopped + * with PoisonPill after this delay. The reason for the delay is to give concurrent + * messages a chance to be placed in mailbox before sending PoisonPill. + * Use 0 seconds to skip delay. + */ stopDelay: Duration = 1.second, /** - * Number of messages between resize operation. - * Use 1 to resize before each message. - */ + * Number of messages between resize operation. + * Use 1 to resize before each message. + */ messagesPerResize: Int = 10) extends Resizer { /** diff --git a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java index c8d8b019bb..2125ae35a8 100644 --- a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java @@ -7,6 +7,7 @@ import akka.routing.ScatterGatherFirstCompletedRouter; import akka.routing.BroadcastRouter; import akka.routing.RandomRouter; import akka.routing.RoundRobinRouter; +import akka.routing.SmallestMailboxRouter; import akka.actor.UntypedActor; import akka.actor.ActorRef; import akka.actor.Props; @@ -34,6 +35,14 @@ public class ParentActor extends UntypedActor { randomRouter.tell(i, getSelf()); } //#randomRouter + } else if (msg.equals("smr")) { + //#smallestMailboxRouter + ActorRef smallestMailboxRouter = getContext().actorOf( + new Props(PrintlnActor.class).withRouter(new SmallestMailboxRouter(5)), "router"); + for (int i = 1; i <= 10; i++) { + smallestMailboxRouter.tell(i, getSelf()); + } + //#smallestMailboxRouter } else if (msg.equals("br")) { //#broadcastRouter ActorRef broadcastRouter = getContext().actorOf(new Props(PrintlnActor.class).withRouter(new BroadcastRouter(5)), diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index e80514a8fe..8cc5b94260 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -122,6 +123,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/jrouting/ParentActor.java#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index d688da6544..3a9f566ed8 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -8,6 +8,7 @@ import annotation.tailrec import akka.actor.{ Props, Actor } import akka.util.duration._ import akka.dispatch.Await +import akka.routing.SmallestMailboxRouter case class FibonacciNumber(nbr: Int) @@ -59,6 +60,14 @@ class ParentActor extends Actor { i ⇒ randomRouter ! i } //#randomRouter + case "smr" ⇒ + //#smallestMailboxRouter + val smallestMailboxRouter = + context.actorOf(Props[PrintlnActor].withRouter(SmallestMailboxRouter(5)), "router") + 1 to 10 foreach { + i ⇒ smallestMailboxRouter ! i + } + //#smallestMailboxRouter case "br" ⇒ //#broadcastRouter val broadcastRouter = diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index ad06b67b8b..8c0e0f7366 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -16,11 +16,12 @@ Router A Router is an actor that routes incoming messages to outbound actors. The router routes the messages sent to it to its underlying actors called 'routees'. -Akka comes with four defined routers out of the box, but as you will see in this chapter it -is really easy to create your own. The four routers shipped with Akka are: +Akka comes with some defined routers out of the box, but as you will see in this chapter it +is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.RoundRobinRouter`` * ``akka.routing.RandomRouter`` +* ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` @@ -123,6 +124,21 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +SmallestMailboxRouter +********************* +A Router that tries to send to the routee with fewest messages in mailbox. +The selection is done in this order: + + * pick any idle routee (not processing message) with empty mailbox + * pick any routee with empty mailbox + * pick routee with fewest pending messages in mailbox + * pick any remote routee, remote actors are consider lowest priority, + since their mailbox size is unknown + +Code example: + +.. includecode:: code/akka/docs/routing/RouterTypeExample.scala#smallestMailboxRouter + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index d8f466c9d2..fe6844b8dc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -28,6 +28,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings val r = deploy.routing match { case RoundRobinRouter(x, _, resizer) ⇒ RemoteRoundRobinRouter(x, nodes, resizer) case RandomRouter(x, _, resizer) ⇒ RemoteRandomRouter(x, nodes, resizer) + case SmallestMailboxRouter(x, _, resizer) ⇒ RemoteSmallestMailboxRouter(x, nodes, resizer) case BroadcastRouter(x, _, resizer) ⇒ RemoteBroadcastRouter(x, nodes, resizer) case ScatterGatherFirstCompletedRouter(x, _, w, resizer) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes, w, resizer) } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala index 52b2d05618..83a64d09a7 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala @@ -82,6 +82,33 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], ove def this(resizer: Resizer) = this(0, Nil, Some(resizer)) } +/** + * A Router that tries to send to routee with fewest messages in mailbox. + *
+ * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means + * that the random router should both create new actors and use the 'routees' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteSmallestMailboxRouter(nrOfInstances: Int, routees: Iterable[String], override val resizer: Option[Resizer] = None) + extends RemoteRouterConfig with SmallestMailboxLike { + + /** + * Constructor that sets the routees to be used. + * Java API + */ + def this(n: Int, t: java.lang.Iterable[String]) = this(n, t.asScala) + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(0, Nil, Some(resizer)) +} + /** * A Router that uses broadcasts a message to all its connections. *
From 2399f02531ce1d8fda33f5f65bbf951f2671976a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 12 Jan 2012 09:53:53 +0100 Subject: [PATCH 02/11] Improvements based on feedback. See #1619 --- .../test/scala/akka/routing/ResizerSpec.scala | 6 +- .../test/scala/akka/routing/RoutingSpec.scala | 15 ++- .../src/main/scala/akka/routing/Routing.scala | 98 +++++++++++++------ akka-docs/java/routing.rst | 7 +- akka-docs/scala/routing.rst | 7 +- 5 files changed, 86 insertions(+), 47 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index d87d688231..6ccad2a95f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -135,15 +135,15 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer)))) val latch1 = new TestLatch(1) - router.!((latch1, busy)) + router ! (latch1, busy) Await.ready(latch1, 2 seconds) val latch2 = new TestLatch(1) - router.!((latch2, busy)) + router ! (latch2, busy) Await.ready(latch2, 2 seconds) val latch3 = new TestLatch(1) - router.!((latch3, busy)) + router ! (latch3, busy) Await.ready(latch3, 2 seconds) Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9811313688..077e69e5d9 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -267,28 +267,33 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with val usedActors = new ConcurrentHashMap[Int, String]() val router = system.actorOf(Props(new Actor { def receive = { - case busy: TestLatch ⇒ + case (busy: TestLatch, receivedLatch: TestLatch) ⇒ usedActors.put(0, self.path.toString) + self ! "another in busy mailbox" + receivedLatch.countDown() Await.ready(busy, TestLatch.DefaultTimeout) case (msg: Int, receivedLatch: TestLatch) ⇒ usedActors.put(msg, self.path.toString) receivedLatch.countDown() + case s: String ⇒ } }).withRouter(SmallestMailboxRouter(3))) val busy = TestLatch(1) - router ! busy + val received0 = TestLatch(1) + router ! (busy, received0) + Await.ready(received0, TestLatch.DefaultTimeout) val received1 = TestLatch(1) - router.!((1, received1)) + router ! (1, received1) Await.ready(received1, TestLatch.DefaultTimeout) val received2 = TestLatch(1) - router.!((2, received2)) + router ! (2, received2) Await.ready(received2, TestLatch.DefaultTimeout) val received3 = TestLatch(1) - router.!((3, received3)) + router ! (3, received3) Await.ready(received3, TestLatch.DefaultTimeout) busy.countDown() diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index f021eb867a..f3065788ec 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -290,8 +290,11 @@ object RoundRobinRouter { * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RoundRobinLike { @@ -307,9 +310,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -365,8 +370,11 @@ object RandomRouter { * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. *
* The configuration parameter trumps the constructor arguments. This means that - * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will - * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None) extends RouterConfig with RandomLike { @@ -382,9 +390,11 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, /** * Constructor that sets the routees to be used. * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(t: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(t)) + def this(routeePaths: java.lang.Iterable[String]) = { + this(routees = iterableAsScalaIterable(routeePaths)) } /** @@ -436,7 +446,7 @@ object SmallestMailboxRouter { } } /** - * A Router that tries to send to the routee with fewest messages in mailbox. + * A Router that tries to send to the non-suspended routee with fewest messages in mailbox. * The selection is done in this order: * - * + * * You can add your own rules quite easily: - * + * * {{{ * trait MyType { // as an example * def name: String * } - * + * * implicit val myLogSourceType: LogSource[MyType] = new LogSource { * def genString(a: MyType) = a.name * } - * + * * class MyClass extends MyType { * val log = Logging(eventStream, this) // will use "hallo" as logSource * def name = "hallo" @@ -353,60 +403,25 @@ object Logging { *
  • in case of a class an approximation of its simpleName *
  • and in all other cases the simpleName of its class
  • * - * - * You can add your own rules quite easily: - * - * {{{ - * trait MyType { // as an example - * def name: String - * } - * - * implicit val myLogSourceType: LogSource[MyType] = new LogSource { - * def genString(a: MyType) = a.name - * } - * - * class MyClass extends MyType { - * val log = Logging(eventStream, this) // will use "hallo" as logSource - * def name = "hallo" - * } - * }}} - */ - def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = - new BusLogging(system.eventStream, implicitly[LogSource[T]].genString(logSource), logSource.getClass) - - /** - * Obtain LoggingAdapter for the given actor system and source object. This - * will use the system’s event stream. * - * The source is used to identify the source of this logging channel and must have - * a corresponding implicit LogSource[T] instance in scope; by default these are - * provided for Class[_], Actor, ActorRef and String types. By these, the source - * object is translated to a String according to the following rules: - * - * * You can add your own rules quite easily: - * + * * {{{ * trait MyType { // as an example * def name: String * } - * + * * implicit val myLogSourceType: LogSource[MyType] = new LogSource { * def genString(a: MyType) = a.name * } - * + * * class MyClass extends MyType { * val log = Logging(eventStream, this) // will use "hallo" as logSource * def name = "hallo" * } * }}} */ - def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system, LogSource.fromAnyRef(logSource)) + def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system, LogSource.fromAnyRef(logSource, system)) /** * Obtain LoggingAdapter for the given logging bus and source object. This @@ -422,25 +437,25 @@ object Logging { *
  • in case of a class an approximation of its simpleName *
  • and in all other cases the simpleName of its class
  • * - * + * * You can add your own rules quite easily: - * + * * {{{ * trait MyType { // as an example * def name: String * } - * + * * implicit val myLogSourceType: LogSource[MyType] = new LogSource { * def genString(a: MyType) = a.name * } - * + * * class MyClass extends MyType { * val log = Logging(eventStream, this) // will use "hallo" as logSource * def name = "hallo" * } * }}} */ - def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) + //def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) /** * Artificial exception injected into Error events if no Throwable is diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5b748098ac..037f9d594a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -27,8 +27,6 @@ class RemoteActorRefProvider( val scheduler: Scheduler, _deadLetters: InternalActorRef) extends ActorRefProvider { - val log = Logging(eventStream, "RemoteActorRefProvider") - val remoteSettings = new RemoteSettings(settings.config, systemName) def rootGuardian = local.rootGuardian @@ -44,6 +42,8 @@ class RemoteActorRefProvider( val remote = new Remote(settings, remoteSettings) implicit val transports = remote.transports + val log = Logging(eventStream, "RemoteActorRefProvider(" + remote.remoteAddress + ")") + val rootPath: ActorPath = RootActorPath(remote.remoteAddress) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) From 358da77ae306a07dcbd03c32d58ef4493b75fd98 Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 11 Jan 2012 14:12:10 +0100 Subject: [PATCH 06/11] make failing test deterministic for RemoteDeathWatchSpec --- .../test/scala/akka/actor/DeathWatchSpec.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 30828c1014..59fe72cc07 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -34,16 +34,23 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "notify with one Terminated message when an Actor is stopped" in { val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) - startWatching(terminal) - - testActor ! "ping" - expectMsg("ping") + startWatching(terminal) ! "hallo" + expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill terminal ! PoisonPill expectTerminationOf(terminal) } + "notify with one Terminated message when an Actor is already dead" in { + val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) + + terminal ! PoisonPill + + startWatching(terminal) + expectTerminationOf(terminal) + } + "notify with all monitors with one Terminated message when an Actor is stopped" in { val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) val monitor1, monitor2, monitor3 = startWatching(terminal) From 7d0e0065476afde5e9c71ae3090d702a99775be0 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 12 Jan 2012 13:25:21 +0100 Subject: [PATCH 07/11] fix bug in EmptyLocalActorRef serialization, see #1591 - ELAR extended DeadLetterActorRef, which is serialized specially, not keeping the name - made deadletter behavior a trait, mix that into both and only override writeReplace in DLAR - remove extraneous debug settings from RemoteDeathWatchSpec --- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 11 ++++++++--- .../test/scala/akka/remote/RemoteDeathWatchSpec.scala | 2 -- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 254b19e010..94ec966468 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -449,7 +449,10 @@ object DeadLetterActorRef { val serialized = new SerializedDeadLetterActorRef } -class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { +trait DeadLetterActorRefLike extends MinimalActorRef { + + def eventStream: EventStream + @volatile private var brokenPromise: Future[Any] = _ @volatile @@ -477,7 +480,9 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { assert(brokenPromise != null) brokenPromise } +} +class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike { @throws(classOf[java.io.ObjectStreamException]) override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized } @@ -486,8 +491,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { * This special dead letter reference has a name: it is that which is returned * by a local look-up which is unsuccessful. */ -class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) - extends DeadLetterActorRef(_eventStream) { +class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) + extends DeadLetterActorRefLike { init(_dispatcher, _path) override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case d: DeadLetter ⇒ // do NOT form endless loops diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index e585ade6d7..b51720aa01 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -15,10 +15,8 @@ akka { deployment { /watchers.remote = "akka://other@127.0.0.1:2666" } - debug.lifecycle = on } cluster.nodename = buh - loglevel = DEBUG remote.server { hostname = "127.0.0.1" port = 2665 From 62499b59011e6c82181a27420e143c1216ba492f Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 12 Jan 2012 13:45:53 +0100 Subject: [PATCH 08/11] quiesce logging of artifact resolution from Ivy --- project/AkkaBuild.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index ada3dbcbd2..b7e950fa8a 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -315,6 +315,8 @@ object AkkaBuild extends Build { if (true || (System getProperty "java.runtime.version" startsWith "1.7")) Seq() else Seq("-optimize")), // -optimize fails with jdk7 javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"), + ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet, + parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean, // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec) From 8d10d44929c8443f93b4dae6eb1e6991bafbb7a9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 12 Jan 2012 16:37:08 +0100 Subject: [PATCH 09/11] Support config of custom router. See #1623 --- .../test/scala/akka/routing/ResizerSpec.scala | 2 +- .../test/scala/akka/routing/RoutingSpec.scala | 21 ++++++++++++++++ akka-actor/src/main/resources/reference.conf | 4 +++- .../src/main/scala/akka/actor/Deployer.scala | 24 ++++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 15 ++++++++++++ akka-docs/java/routing.rst | 8 +++++++ akka-docs/scala/routing.rst | 8 +++++++ 7 files changed, 69 insertions(+), 13 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 6ccad2a95f..35cc429fa6 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -178,7 +178,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - count.set(0) + (10 millis).dilated.sleep for (m ← 0 until loops) { router.!((t, latch, count)) (10 millis).dilated.sleep diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 077e69e5d9..a9ec39ff6e 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -13,6 +13,7 @@ import akka.util.Duration import akka.config.ConfigurationException import com.typesafe.config.ConfigFactory import java.util.concurrent.ConcurrentHashMap +import com.typesafe.config.Config object RoutingSpec { @@ -22,6 +23,10 @@ object RoutingSpec { router = round-robin nr-of-instances = 3 } + /myrouter { + router = "akka.routing.RoutingSpec$MyRouter" + foo = bar + } } """ @@ -38,6 +43,18 @@ object RoutingSpec { } } + class MyRouter(config: Config) extends RouterConfig { + val foo = config.getString("foo") + def createRoute(routeeProps: Props, actorContext: ActorContext): Route = { + val routees = IndexedSeq(actorContext.actorOf(Props[Echo])) + registerRoutees(actorContext, routees) + + { + case (sender, message) ⇒ Nil + } + } + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -465,6 +482,10 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with sys.shutdown() } } + "support custom router" in { + val myrouter = system.actorOf(Props().withRouter(FromConfig), "myrouter") + myrouter.isTerminated must be(false) + } } "custom router" must { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 07e363fca9..02d1a49035 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -65,7 +65,9 @@ akka { # routing (load-balance) scheme to use # available: "from-code", "round-robin", "random", "smallest-mailbox", "scatter-gather", "broadcast" - # or: fully qualified class name of the router class + # or: Fully qualified class name of the router class. + # The router class must extend akka.routing.CustomRouterConfig and and have constructor + # with com.typesafe.config.Config parameter. # default is "from-code"; # Whether or not an actor is transformed to a Router is decided in code only (Props.withRouter). # The type of router can be overridden in the configuration; specifying "from-code" means diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 23c6da6661..5ac4c13391 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -13,6 +13,7 @@ import akka.event.EventStream import com.typesafe.config._ import akka.routing._ import java.util.concurrent.{ TimeUnit, ConcurrentHashMap } +import akka.util.ReflectiveAccess case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) @@ -56,16 +57,7 @@ class Deployer(val settings: ActorSystem.Settings) { val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) val resizer: Option[Resizer] = if (config.hasPath("resizer")) { - val resizerConfig = deployment.getConfig("resizer") - Some(DefaultResizer( - lowerBound = resizerConfig.getInt("lower-bound"), - upperBound = resizerConfig.getInt("upper-bound"), - pressureThreshold = resizerConfig.getInt("pressure-threshold"), - rampupRate = resizerConfig.getDouble("rampup-rate"), - backoffThreshold = resizerConfig.getDouble("backoff-threshold"), - backoffRate = resizerConfig.getDouble("backoff-rate"), - stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS), - messagesPerResize = resizerConfig.getInt("messages-per-resize"))) + Some(DefaultResizer(deployment.getConfig("resizer"))) } else { None } @@ -77,7 +69,17 @@ class Deployer(val settings: ActorSystem.Settings) { case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) + case fqn ⇒ + val constructorSignature = Array[Class[_]](classOf[Config]) + ReflectiveAccess.createInstance[RouterConfig](fqn, constructorSignature, Array[AnyRef](deployment)) match { + case Right(router) ⇒ router + case Left(exception) ⇒ + throw new IllegalArgumentException( + ("Cannot instantiate router [%s], defined in [%s], " + + "make sure it extends [akka.routing.RouterConfig] and has constructor with " + + "[com.typesafe.config.Config] parameter") + .format(fqn, key), exception) + } } val recipe: Option[ActorRecipe] = diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index f3065788ec..0473f99fd6 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -7,8 +7,10 @@ import akka.actor._ import akka.dispatch.Future import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeUnit import akka.util.{ Duration, Timeout } import akka.util.duration._ +import com.typesafe.config.Config import akka.config.ConfigurationException import scala.collection.JavaConversions.iterableAsScalaIterable @@ -760,6 +762,19 @@ trait Resizer { def resize(props: Props, actorContext: ActorContext, currentRoutees: IndexedSeq[ActorRef], routerConfig: RouterConfig) } +case object DefaultResizer { + def apply(resizerConfig: Config): DefaultResizer = + DefaultResizer( + lowerBound = resizerConfig.getInt("lower-bound"), + upperBound = resizerConfig.getInt("upper-bound"), + pressureThreshold = resizerConfig.getInt("pressure-threshold"), + rampupRate = resizerConfig.getDouble("rampup-rate"), + backoffThreshold = resizerConfig.getDouble("backoff-threshold"), + backoffRate = resizerConfig.getDouble("backoff-rate"), + stopDelay = Duration(resizerConfig.getMilliseconds("stop-delay"), TimeUnit.MILLISECONDS), + messagesPerResize = resizerConfig.getInt("messages-per-resize")) +} + case class DefaultResizer( /** * The fewest number of routees the router should ever have. diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index cdcc869b2a..42ad1108ea 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -256,6 +256,14 @@ If you are interested in how to use the VoteCountRouter it looks like this: .. includecode:: code/akka/docs/jrouting/CustomRouterDocTestBase.java#crTest +Configured Custom Router +************************ + +It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment +configuration you define the fully qualified class name of the router class. The router class must extend +``akka.routing.CustomRouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter. +The deployment section of the configuration is passed to the constructor. + Custom Resizer ************** diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 4e75be8798..5b2ed24d28 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -255,6 +255,14 @@ All in all the custom router looks like this: If you are interested in how to use the VoteCountRouter you can have a look at the test class `RoutingSpec `_ +Configured Custom Router +************************ + +It is possible to define configuration properties for custom routers. In the ``router`` property of the deployment +configuration you define the fully qualified class name of the router class. The router class must extend +``akka.routing.RouterConfig`` and and have constructor with ``com.typesafe.config.Config`` parameter. +The deployment section of the configuration is passed to the constructor. + Custom Resizer ************** From b01640fddb9987ee305f49eaad0e935dec2af950 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 13 Jan 2012 13:50:42 +0100 Subject: [PATCH 10/11] incorporate review feedback - tons of documentation added - lift extraction of logClass into LogSource type-class - prefer Props.empty --- .../scala/akka/actor/DeathWatchSpec.scala | 8 +- .../akka/actor/dispatch/ActorModelSpec.scala | 5 +- .../scala/akka/event/LoggingReceiveSpec.scala | 2 +- .../src/main/scala/akka/event/Logging.scala | 284 ++++++++++-------- .../scala/akka/event/LoggingReceive.scala | 3 +- akka-docs/java/logging.rst | 22 +- .../code/akka/docs/event/LoggingDocSpec.scala | 18 ++ akka-docs/scala/logging.rst | 49 ++- .../main/scala/akka/event/slf4j/SLF4J.scala | 16 +- 9 files changed, 260 insertions(+), 147 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 59fe72cc07..cd6dc58129 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -33,7 +33,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } "notify with one Terminated message when an Actor is stopped" in { - val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) + val terminal = system.actorOf(Props.empty) startWatching(terminal) ! "hallo" expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill @@ -43,7 +43,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } "notify with one Terminated message when an Actor is already dead" in { - val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) + val terminal = system.actorOf(Props.empty) terminal ! PoisonPill @@ -52,7 +52,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } "notify with all monitors with one Terminated message when an Actor is stopped" in { - val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) + val terminal = system.actorOf(Props.empty) val monitor1, monitor2, monitor3 = startWatching(terminal) terminal ! PoisonPill @@ -67,7 +67,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout } "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { - val terminal = system.actorOf(Props(context ⇒ { case _ ⇒ })) + val terminal = system.actorOf(Props.empty) val monitor1, monitor3 = startWatching(terminal) val monitor2 = system.actorOf(Props(new Actor { context.watch(terminal) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index b0d831dc77..fb75ab5593 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -208,8 +208,9 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - system.eventStream.publish(Error(e, Option(dispatcher).toString, - if (dispatcher ne null) dispatcher.getClass else this.getClass, + system.eventStream.publish(Error(e, + Option(dispatcher).toString, + (Option(dispatcher) getOrElse this).getClass, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 6d524729dd..bcfb9c391b 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -59,7 +59,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd } val log = LoggingReceive("funky")(r) log.isDefinedAt("hallo") - expectMsg(1 second, Logging.Debug("funky", classOf[String], "received unhandled message hallo")) + expectMsg(1 second, Logging.Debug("funky", classOf[DummyClassForStringSources], "received unhandled message hallo")) } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 33c4b1339e..07a1da1da5 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -168,15 +168,85 @@ trait LoggingBus extends ActorEventBus { } +/** + * This trait defines the interface to be provided by a “log source formatting + * rule” as used by [[akka.event.Logging]]’s `apply`/`create` method. + * + * See the companion object for default implementations. + * + * Example: + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * } + * + * class MyClass extends MyType { + * val log = Logging(eventStream, this) // will use "hallo" as logSource + * def name = "hallo" + * } + * }}} + * + * The second variant is used for including the actor system’s address: + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * def genString(a: MyType, s: ActorSystem) = a.name + "," + s + * } + * + * class MyClass extends MyType { + * val sys = ActorSyste("sys") + * val log = Logging(sys, this) // will use "hallo,akka://sys" as logSource + * def name = "hallo" + * } + * }}} + * + * The default implementation of the second variant will just call the first. + */ trait LogSource[-T] { def genString(t: T): String def genString(t: T, system: ActorSystem): String = genString(t) + def getClazz(t: T): Class[_] = t.getClass } +/** + * This is a “marker” class which is inserted as originator class into + * [[akka.event.LogEvent]] when the string representation was supplied + * directly. + */ +class DummyClassForStringSources + +/** + * This object holds predefined formatting rules for log sources. + * + * In case an [[akka.actor.ActorSystem]] is provided, the following apply: + *
      + *
    • [[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path
    • + *
    • providing a `String` as source will append "()" and use the result
    • + *
    • providing a `Class` will extract its simple name, append "()" and use the result
    • + *
    • anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it
    • + *
    + * + * In case a [[akka.event.LoggingBus]] is provided, the following apply: + *
      + *
    • [[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path
    • + *
    • providing a `String` as source will be used as is
    • + *
    • providing a `Class` will extract its simple name
    • + *
    • anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it
    • + *
    + */ object LogSource { implicit val fromString: LogSource[String] = new LogSource[String] { def genString(s: String) = s override def genString(s: String, system: ActorSystem) = s + "(" + system + ")" + override def getClazz(s: String) = classOf[DummyClassForStringSources] } implicit val fromActor: LogSource[Actor] = new LogSource[Actor] { @@ -191,29 +261,54 @@ object LogSource { val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] { def genString(c: Class[_]) = simpleName(c) override def genString(c: Class[_], system: ActorSystem) = simpleName(c) + "(" + system + ")" + override def getClazz(c: Class[_]) = c } implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]] - def apply[T: LogSource](o: T): String = implicitly[LogSource[T]].genString(o) + /** + * Convenience converter access: given an implicit `LogSource`, generate the + * string representation and originating class. + */ + def apply[T: LogSource](o: T): (String, Class[_]) = { + val ls = implicitly[LogSource[T]] + (ls.genString(o), ls.getClazz(o)) + } - def apply[T: LogSource](o: T, system: ActorSystem): String = implicitly[LogSource[T]].genString(o, system) + /** + * Convenience converter access: given an implicit `LogSource` and + * [[akka.actor.ActorSystem]], generate the string representation and + * originating class. + */ + def apply[T: LogSource](o: T, system: ActorSystem): (String, Class[_]) = { + val ls = implicitly[LogSource[T]] + (ls.genString(o, system), ls.getClazz(o)) + } - def fromAnyRef(o: AnyRef): String = + /** + * construct string representation for any object according to + * rules above with fallback to its `Class`’s simple name. + */ + def fromAnyRef(o: AnyRef): (String, Class[_]) = o match { - case c: Class[_] ⇒ fromClass.genString(c) - case a: Actor ⇒ fromActor.genString(a) - case a: ActorRef ⇒ fromActorRef.genString(a) - case s: String ⇒ s - case x ⇒ simpleName(x) + case c: Class[_] ⇒ apply(c) + case a: Actor ⇒ apply(a) + case a: ActorRef ⇒ apply(a) + case s: String ⇒ apply(s) + case x ⇒ (simpleName(x), x.getClass) } - def fromAnyRef(o: AnyRef, system: ActorSystem): String = + /** + * construct string representation for any object according to + * rules above (including the actor system’s address) with fallback to its + * `Class`’s simple name. + */ + def fromAnyRef(o: AnyRef, system: ActorSystem): (String, Class[_]) = o match { - case c: Class[_] ⇒ fromClass.genString(c, system) - case a: Actor ⇒ fromActor.genString(a, system) - case a: ActorRef ⇒ fromActorRef.genString(a, system) - case s: String ⇒ fromString.genString(s, system) - case x ⇒ simpleName(x) + "(" + system + ")" + case c: Class[_] ⇒ apply(c) + case a: Actor ⇒ apply(a) + case a: ActorRef ⇒ apply(a) + case s: String ⇒ apply(s) + case x ⇒ (simpleName(x) + "(" + system + ")", x.getClass) } } @@ -322,140 +417,79 @@ object Logging { /** * Obtain LoggingAdapter for the given actor system and source object. This - * will use the system’s event stream. + * will use the system’s event stream and include the system’s address in the + * log source string. * - * The source is used to identify the source of this logging channel and must have - * a corresponding implicit LogSource[T] instance in scope; by default these are - * provided for Class[_], Actor, ActorRef and String types. By these, the source - * object is translated to a String according to the following rules: - *
      - *
    • if it is an Actor or ActorRef, its path is used
    • - *
    • in case of a String it is used as is
    • - *
    • in case of a class an approximation of its simpleName - *
    • and in all other cases the simpleName of its class
    • - *
    - * - * You can add your own rules quite easily: + * Do not use this if you want to supply a log category string (like + * “com.example.app.whatever”) unaltered, supply `system.eventStream` in this + * case or use * * {{{ - * trait MyType { // as an example - * def name: String - * } - * - * implicit val myLogSourceType: LogSource[MyType] = new LogSource { - * def genString(a: MyType) = a.name - * } - * - * class MyClass extends MyType { - * val log = Logging(eventStream, this) // will use "hallo" as logSource - * def name = "hallo" - * } + * Logging(system, this.getClass) * }}} + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. + * + * You can add your own rules quite easily, see [[akka.event.LogSource]]. */ - def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = - new BusLogging(system.eventStream, LogSource(logSource, system), logSource.getClass) + def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = { + val (str, clazz) = LogSource(logSource, system) + new BusLogging(system.eventStream, str, clazz) + } /** * Obtain LoggingAdapter for the given logging bus and source object. * - * The source is used to identify the source of this logging channel and must have - * a corresponding implicit LogSource[T] instance in scope; by default these are - * provided for Class[_], Actor, ActorRef and String types. By these, the source - * object is translated to a String according to the following rules: - *
      - *
    • if it is an Actor or ActorRef, its path is used
    • - *
    • in case of a String it is used as is
    • - *
    • in case of a class an approximation of its simpleName - *
    • and in all other cases the simpleName of its class
    • - *
    + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. * - * You can add your own rules quite easily: - * - * {{{ - * trait MyType { // as an example - * def name: String - * } - * - * implicit val myLogSourceType: LogSource[MyType] = new LogSource { - * def genString(a: MyType) = a.name - * } - * - * class MyClass extends MyType { - * val log = Logging(eventStream, this) // will use "hallo" as logSource - * def name = "hallo" - * } - * }}} + * You can add your own rules quite easily, see [[akka.event.LogSource]]. */ - def apply[T: LogSource](bus: LoggingBus, logSource: T): LoggingAdapter = - new BusLogging(bus, implicitly[LogSource[T]].genString(logSource), logSource.getClass) + def apply[T: LogSource](bus: LoggingBus, logSource: T): LoggingAdapter = { + val (str, clazz) = LogSource(logSource) + new BusLogging(bus, str, clazz) + } /** * Obtain LoggingAdapter for the given actor system and source object. This - * will use the system’s event stream. + * will use the system’s event stream and include the system’s address in the + * log source string. * - * The source is used to identify the source of this logging channel and must have - * a corresponding implicit LogSource[T] instance in scope; by default these are - * provided for Class[_], Actor, ActorRef and String types. By these, the source - * object is translated to a String according to the following rules: - *
      - *
    • if it is an Actor or ActorRef, its path is used
    • - *
    • in case of a String it is used as is
    • - *
    • in case of a class an approximation of its simpleName - *
    • and in all other cases the simpleName of its class
    • - *
    - * - * You can add your own rules quite easily: + * Do not use this if you want to supply a log category string (like + * “com.example.app.whatever”) unaltered, supply `system.eventStream` in this + * case or use * * {{{ - * trait MyType { // as an example - * def name: String - * } - * - * implicit val myLogSourceType: LogSource[MyType] = new LogSource { - * def genString(a: MyType) = a.name - * } - * - * class MyClass extends MyType { - * val log = Logging(eventStream, this) // will use "hallo" as logSource - * def name = "hallo" - * } + * Logging.getLogger(system, this.getClass()); * }}} + * + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. */ - def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system, LogSource.fromAnyRef(logSource, system)) + def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = { + val (str, clazz) = LogSource.fromAnyRef(logSource, system) + new BusLogging(system.eventStream, str, clazz) + } /** - * Obtain LoggingAdapter for the given logging bus and source object. This - * will use the system’s event stream. + * Obtain LoggingAdapter for the given logging bus and source object. * - * The source is used to identify the source of this logging channel and must have - * a corresponding implicit LogSource[T] instance in scope; by default these are - * provided for Class[_], Actor, ActorRef and String types. By these, the source - * object is translated to a String according to the following rules: - *
      - *
    • if it is an Actor or ActorRef, its path is used
    • - *
    • in case of a String it is used as is
    • - *
    • in case of a class an approximation of its simpleName - *
    • and in all other cases the simpleName of its class
    • - *
    - * - * You can add your own rules quite easily: - * - * {{{ - * trait MyType { // as an example - * def name: String - * } - * - * implicit val myLogSourceType: LogSource[MyType] = new LogSource { - * def genString(a: MyType) = a.name - * } - * - * class MyClass extends MyType { - * val log = Logging(eventStream, this) // will use "hallo" as logSource - * def name = "hallo" - * } - * }}} + * The source is used to identify the source of this logging channel and + * must have a corresponding implicit LogSource[T] instance in scope; by + * default these are provided for Class[_], Actor, ActorRef and String types. + * See the companion object of [[akka.event.LogSource]] for details. */ - //def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) + def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = { + val (str, clazz) = LogSource.fromAnyRef(logSource) + new BusLogging(bus, str, clazz) + } /** * Artificial exception injected into Error events if no Throwable is diff --git a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala index bb5a282856..27d829de5e 100644 --- a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala +++ b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala @@ -36,7 +36,8 @@ object LoggingReceive { class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - system.eventStream.publish(Debug(LogSource.fromAnyRef(source), source.getClass, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) + val (str, clazz) = LogSource.fromAnyRef(source) + system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) handled } def apply(o: Any): Unit = r(o) diff --git a/akka-docs/java/logging.rst b/akka-docs/java/logging.rst index aee644c175..ffee92d00e 100644 --- a/akka-docs/java/logging.rst +++ b/akka-docs/java/logging.rst @@ -17,8 +17,13 @@ as illustrated in this example: .. includecode:: code/akka/docs/event/LoggingDocTestBase.java :include: imports,my-actor -The second parameter to the ``Logging.getLogger`` is the source of this logging channel. -The source object is translated to a String according to the following rules: +The first parameter to ``Logging.getLogger`` could also be any +:class:`LoggingBus`, specifically ``system.eventStream()``; in the demonstrated +case, the actor system’s address is included in the ``akkaSource`` +representation of the log source (see `Logging Thread and Akka Source in MDC`_) +while in the second case this is not automatically done. The second parameter +to ``Logging.getLogger`` is the source of this logging channel. The source +object is translated to a String according to the following rules: * if it is an Actor or ActorRef, its path is used * in case of a String it is used as is @@ -28,6 +33,13 @@ The source object is translated to a String according to the following rules: The log message may contain argument placeholders ``{}``, which will be substituted if the log level is enabled. +The Java :class:`Class` of the log source is also included in the generated +:class:`LogEvent`. In case of a simple string this is replaced with a “marker” +class :class:`akka.event.DummyClassForStringSources` in order to allow special +treatment of this case, e.g. in the SLF4J event listener which will then use +the string instead of the class’ name for looking up the logger instance to +use. + Event Handler ============= @@ -96,6 +108,12 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi +.. note:: + + It will probably be a good idea to use the ``sourceThread`` MDC value also in + non-Akka parts of the application in order to have this property consistently + available in the logs. + Another helpful facility is that Akka captures the actor’s address when instantiating a logger within it, meaning that the full instance identification is available for associating log messages e.g. with members of a router. This diff --git a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala index c3c070d374..652c36af3f 100644 --- a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala @@ -47,6 +47,24 @@ object LoggingDocSpec { } //#my-event-listener + //#my-source + import akka.event.LogSource + import akka.actor.ActorSystem + + object MyType { + implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { + def genString(o: AnyRef): String = o.getClass.getName + override def getClazz(o: AnyRef): Class[_] = o.getClass + } + } + + class MyType(system: ActorSystem) { + import MyType._ + import akka.event.Logging + + val log = Logging(system, this) + } + //#my-source } class LoggingDocSpec extends AkkaSpec { diff --git a/akka-docs/scala/logging.rst b/akka-docs/scala/logging.rst index f4272c5da0..debafcedc5 100644 --- a/akka-docs/scala/logging.rst +++ b/akka-docs/scala/logging.rst @@ -22,6 +22,8 @@ For convenience you can mixin the ``log`` member into actors, instead of definin .. code-block:: scala class MyActor extends Actor with akka.actor.ActorLogging { + ... + } The second parameter to the ``Logging`` is the source of this logging channel. The source object is translated to a String according to the following rules: @@ -29,17 +31,46 @@ The source object is translated to a String according to the following rules: * if it is an Actor or ActorRef, its path is used * in case of a String it is used as is * in case of a class an approximation of its simpleName - * and in all other cases the simpleName of its class + * and in all other cases a compile error occurs unless and implicit + :class:`LogSource[T]` is in scope for the type in question. The log message may contain argument placeholders ``{}``, which will be substituted if the log level is enabled. +Translating Log Source to String and Class +------------------------------------------ + +The rules for translating the source object to the source string and class +which are inserted into the :class:`LogEvent` during runtime are implemented +using implicit parameters and thus fully customizable: simply create your own +instance of :class:`LogSource[T]` and have it in scope when creating the +logger. + +.. includecode:: code/akka/docs/event/LoggingDocSpec.scala#my-source + +This example creates a log source which mimics traditional usage of Java +loggers, which are based upon the originating object’s class name as log +category. The override of :meth:`getClazz` is only included for demonstration +purposes as it contains exactly the default behavior. + +.. note:: + + You may also create the string representation up front and pass that in as + the log source, but be aware that then the :class:`Class[_]` which will be + put in the :class:`LogEvent` is + :class:`akka.event.DummyClassForStringSources`. + + The SLF4J event listener treats this case specially (using the actual string + to look up the logger instance to use instead of the class’ name), and you + might want to do this also in case you implement your own loggin adapter. + Event Handler ============= -Logging is performed asynchronously through an event bus. You can configure which event handlers that should -subscribe to the logging events. That is done using the 'event-handlers' element in the :ref:`configuration`. -Here you can also define the log level. +Logging is performed asynchronously through an event bus. You can configure +which event handlers that should subscribe to the logging events. That is done +using the ``event-handlers`` element in the :ref:`configuration`. Here you can +also define the log level. .. code-block:: ruby @@ -50,7 +81,8 @@ Here you can also define the log level. loglevel = "DEBUG" } -The default one logs to STDOUT and is registered by default. It is not intended to be used for production. There is also an :ref:`slf4j-scala` +The default one logs to STDOUT and is registered by default. It is not intended +to be used for production. There is also an :ref:`slf4j-scala` event handler available in the 'akka-slf4j' module. Example of creating a listener: @@ -58,7 +90,6 @@ Example of creating a listener: .. includecode:: code/akka/docs/event/LoggingDocSpec.scala :include: my-event-listener - .. _slf4j-scala: SLF4J @@ -98,6 +129,12 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi +.. note:: + + It will probably be a good idea to use the ``sourceThread`` MDC value also in + non-Akka parts of the application in order to have this property consistently + available in the logs. + Another helpful facility is that Akka captures the actor’s address when instantiating a logger within it, meaning that the full instance identification is available for associating log messages e.g. with members of a router. This diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index 4831d78270..91a6cd7bf2 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -8,6 +8,7 @@ import org.slf4j.{ Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory } import org.slf4j.MDC import akka.event.Logging._ import akka.actor._ +import akka.event.DummyClassForStringSources /** * Base trait for all classes that wants to be able use the SLF4J logging infrastructure. @@ -19,7 +20,10 @@ trait SLF4JLogging { object Logger { def apply(logger: String): SLFLogger = SLFLoggerFactory getLogger logger - def apply(logClass: Class[_]): SLFLogger = SLFLoggerFactory getLogger logClass + def apply(logClass: Class[_], logSource: String): SLFLogger = logClass match { + case c if c == classOf[DummyClassForStringSources] ⇒ apply(logSource) + case _ ⇒ SLFLoggerFactory getLogger logClass + } def root: SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME) } @@ -39,24 +43,24 @@ class Slf4jEventHandler extends Actor with SLF4JLogging { case event @ Error(cause, logSource, logClass, message) ⇒ withMdc(logSource, event.thread.getName) { cause match { - case Error.NoCause ⇒ Logger(logClass).error(message.toString) - case _ ⇒ Logger(logClass).error(message.toString, cause) + case Error.NoCause ⇒ Logger(logClass, logSource).error(message.toString) + case _ ⇒ Logger(logClass, logSource).error(message.toString, cause) } } case event @ Warning(logSource, logClass, message) ⇒ withMdc(logSource, event.thread.getName) { - Logger(logClass).warn("{}", message.asInstanceOf[AnyRef]) + Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } case event @ Info(logSource, logClass, message) ⇒ withMdc(logSource, event.thread.getName) { - Logger(logClass).info("{}", message.asInstanceOf[AnyRef]) + Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } case event @ Debug(logSource, logClass, message) ⇒ withMdc(logSource, event.thread.getName) { - Logger(logClass).debug("{}", message.asInstanceOf[AnyRef]) + Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } case InitializeLogger(_) ⇒ From fcbbc892cb948baab491eacaaf9d9c92b0eb9757 Mon Sep 17 00:00:00 2001 From: viktorklang Date: Fri, 13 Jan 2012 20:55:12 +0100 Subject: [PATCH 11/11] typo --- akka-docs/intro/deployment-scenarios.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/akka-docs/intro/deployment-scenarios.rst b/akka-docs/intro/deployment-scenarios.rst index c76284d62c..f8d6f6b197 100644 --- a/akka-docs/intro/deployment-scenarios.rst +++ b/akka-docs/intro/deployment-scenarios.rst @@ -1,4 +1,3 @@ - .. _deployment-scenarios: ################################### @@ -28,7 +27,7 @@ Actors as services ^^^^^^^^^^^^^^^^^^ The simplest way you can use Akka is to use the actors as services in your Web -application. All that’s needed to do that is to put the Akka charts as well as +application. All that’s needed to do that is to put the Akka jars as well as its dependency jars into ``WEB-INF/lib``. You also need to put the :ref:`configuration` file in the ``$AKKA_HOME/config`` directory. Now you can create your Actors as regular services referenced from your Web application. You should also