diff --git a/akka-actor/src/main/scala/akka/routing/Router.scala b/akka-actor/src/main/scala/akka/routing/Router.scala index 600f1df9fe..3c8cc3f2c5 100644 --- a/akka-actor/src/main/scala/akka/routing/Router.scala +++ b/akka-actor/src/main/scala/akka/routing/Router.scala @@ -8,6 +8,7 @@ import akka.actor.ActorRef import akka.actor.ActorSelection import akka.actor.InternalActorRef import akka.japi.Util.immutableSeq +import akka.actor.NoSerializationVerificationNeeded /** * The interface of the routing logic that is used in a [[Router]] to select @@ -15,7 +16,7 @@ import akka.japi.Util.immutableSeq * * The implementation must be thread safe. */ -trait RoutingLogic { +trait RoutingLogic extends NoSerializationVerificationNeeded { /** * Pick the destination for a given message. Normally it picks one of the * passed `routees`, but in the end it is up to the implementation to diff --git a/akka-contrib/docs/distributed-pub-sub.rst b/akka-contrib/docs/distributed-pub-sub.rst index 5621fc51ba..4299e908b4 100644 --- a/akka-contrib/docs/distributed-pub-sub.rst +++ b/akka-contrib/docs/distributed-pub-sub.rst @@ -28,13 +28,14 @@ any other node. There is three modes of message delivery. **1. DistributedPubSubMediator.Send** The message will be delivered to one recipient with a matching path, if any such -exists in the registry. If several entries match the path the message will be delivered -to one random destination. The sender of the message can specify that local -affinity is preferred, i.e. the message is sent to an actor in the same local actor -system as the used mediator actor, if any such exists, otherwise random to any other -matching entry. A typical usage of this mode is private chat to one other user in -an instant messaging application. It can also be used for distributing tasks to workers, -like a random router. +exists in the registry. If several entries match the path the message will be sent +via the supplied ``RoutingLogic`` (default random) to one destination. The sender of the +message can specify that local affinity is preferred, i.e. the message is sent to an actor +in the same local actor system as the used mediator actor, if any such exists, otherwise +route to any other matching entry. A typical usage of this mode is private chat to one +other user in an instant messaging application. It can also be used for distributing +tasks to registered workers, like a cluster aware router where the routees dynamically +can register themselves. **2. DistributedPubSubMediator.SendToAll** diff --git a/akka-contrib/docs/reliable-proxy.rst b/akka-contrib/docs/reliable-proxy.rst index 4bba12f9eb..469758c13b 100644 --- a/akka-contrib/docs/reliable-proxy.rst +++ b/akka-contrib/docs/reliable-proxy.rst @@ -71,12 +71,12 @@ Since this implementation does not offer much in the way of configuration, simply instantiate a proxy wrapping some target reference. From Java it looks like this: -.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ReliableProxyTest.java#import -.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ReliableProxyTest.java#demo-proxy +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ReliableProxyTest.java + :include: import,demo-proxy And from Scala like this: -.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala#demo +.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala#demo Since the :class:`ReliableProxy` actor is an :ref:`fsm-scala`, it also offers the capability to subscribe to state transitions. If you need to know when all diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index 7ccfd8d04b..4c9aa46d83 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -14,6 +14,10 @@ akka.contrib.cluster.pub-sub { # Start the mediator on members tagged with this role. # All members are used if undefined or empty. role = "" + + # The routing logic to use for 'Send' + # Possible values: random, round-robin, consistent-hashing, broadcast + routing-logic = random # How often the DistributedPubSubMediator should send out gossip information gossip-interval = 1s diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 51811fd934..c45863cd9a 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -8,7 +8,6 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import java.net.URLEncoder - import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorPath @@ -25,6 +24,14 @@ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.Member import akka.cluster.MemberStatus +import akka.routing.RandomRoutingLogic +import akka.routing.RoutingLogic +import akka.routing.Routee +import akka.routing.ActorRefRoutee +import akka.routing.Router +import akka.routing.RoundRobinRoutingLogic +import akka.routing.ConsistentHashingRoutingLogic +import akka.routing.BroadcastRoutingLogic object DistributedPubSubMediator { @@ -33,18 +40,20 @@ object DistributedPubSubMediator { */ def props( role: Option[String], + routingLogic: RoutingLogic = RandomRoutingLogic(), gossipInterval: FiniteDuration = 1.second, removedTimeToLive: FiniteDuration = 2.minutes): Props = - Props(classOf[DistributedPubSubMediator], role, gossipInterval, removedTimeToLive) + Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive) /** * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]. */ def props( role: String, + routingLogic: RoutingLogic, gossipInterval: FiniteDuration, removedTimeToLive: FiniteDuration): Props = - props(Internal.roleOption(role), gossipInterval, removedTimeToLive) + props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive) /** * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]] @@ -59,7 +68,12 @@ object DistributedPubSubMediator { @SerialVersionUID(1L) case class SubscribeAck(subscribe: Subscribe) @SerialVersionUID(1L) case class UnsubscribeAck(unsubscribe: Unsubscribe) @SerialVersionUID(1L) case class Publish(topic: String, msg: Any) - @SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean) + @SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean) { + /** + * Convenience constructor with `localAffinity` false + */ + def this(path: String, msg: Any) = this(path, msg, localAffinity = false) + } @SerialVersionUID(1L) case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false) { def this(path: String, msg: Any) = this(path, msg, allButSelf = false) } @@ -80,7 +94,9 @@ object DistributedPubSubMediator { content: Map[String, ValueHolder]) @SerialVersionUID(1L) - case class ValueHolder(version: Long, ref: Option[ActorRef]) + case class ValueHolder(version: Long, ref: Option[ActorRef]) { + @transient lazy val routee: Option[Routee] = ref map ActorRefRoutee + } @SerialVersionUID(1L) case class Status(versions: Map[Address, Long]) @@ -156,13 +172,14 @@ object DistributedPubSubMediator { * * 1. [[DistributedPubSubMediator.Send]] - * The message will be delivered to one recipient with a matching path, if any such - * exists in the registry. If several entries match the path the message will be delivered - * to one random destination. The sender of the message can specify that local - * affinity is preferred, i.e. the message is sent to an actor in the same local actor - * system as the used mediator actor, if any such exists, otherwise random to any other - * matching entry. A typical usage of this mode is private chat to one other user in - * an instant messaging application. It can also be used for distributing tasks to workers, - * like a random router. + * exists in the registry. If several entries match the path the message will be sent + * via the supplied `routingLogic` (default random) to one destination. The sender of the + * message can specify that local affinity is preferred, i.e. the message is sent to an actor + * in the same local actor system as the used mediator actor, if any such exists, otherwise + * route to any other matching entry. A typical usage of this mode is private chat to one + * other user in an instant messaging application. It can also be used for distributing + * tasks to registered workers, like a cluster aware router where the routees dynamically + * can register themselves. * * 2. [[DistributedPubSubMediator.SendToAll]] - * The message will be delivered to all recipients with a matching path. Actors with @@ -194,6 +211,7 @@ object DistributedPubSubMediator { */ class DistributedPubSubMediator( role: Option[String], + routingLogic: RoutingLogic, gossipInterval: FiniteDuration, removedTimeToLive: FiniteDuration) extends Actor with ActorLogging { @@ -250,14 +268,14 @@ class DistributedPubSubMediator( case Some(ValueHolder(_, Some(ref))) if localAffinity ⇒ ref forward msg case _ ⇒ - val refs = (for { + val routees = (for { (_, bucket) ← registry valueHolder ← bucket.content.get(path) - ref ← valueHolder.ref - } yield ref).toVector + routee ← valueHolder.routee + } yield routee).toVector - if (refs.nonEmpty) - refs(ThreadLocalRandom.current.nextInt(refs.size)) forward msg + if (routees.nonEmpty) + Router(routingLogic, routees).route(msg, sender) } case SendToAll(path, msg, skipSenderNode) ⇒ @@ -459,10 +477,16 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension if (isTerminated) system.deadLetters else { + val routingLogic = config.getString("routing-logic") match { + case "random" ⇒ RandomRoutingLogic() + case "round-robin" ⇒ RoundRobinRoutingLogic() + case "consistent-hashing" ⇒ ConsistentHashingRoutingLogic(system) + case "broadcast" ⇒ BroadcastRoutingLogic() + } val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS) val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS) val name = config.getString("name") - system.actorOf(DistributedPubSubMediator.props(role, gossipInterval, removedTimeToLive), + system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive), name) } } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala index f7803a3138..1bd746a9cb 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala @@ -67,18 +67,14 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod enterBarrier("initialize") runOn(local) { - //#demo import akka.contrib.pattern.ReliableProxy system.actorSelection(node(remote) / "user" / "echo") ! Identify("echo") target = expectMsgType[ActorIdentity].ref.get proxy = system.actorOf(Props(classOf[ReliableProxy], target, 100.millis), "proxy") - //#demo proxy ! FSM.SubscribeTransitionCallBack(testActor) expectState(Idle) - //#demo proxy ! "hello" - //#demo expectTransition(Idle, Active) expectTransition(Active, Idle) } diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java index 496855680e..dd933aec2e 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java @@ -4,56 +4,90 @@ package akka.contrib.pattern; +import java.util.concurrent.TimeUnit; + import akka.testkit.AkkaJUnitActorSystemResource; + import org.junit.ClassRule; import org.junit.Test; import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; -import akka.actor.Actor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.FSM; import akka.actor.Props; import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; import akka.testkit.TestProbe; //#import import akka.contrib.pattern.ReliableProxy; + + //#import public class ReliableProxyTest { @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("ReliableProxyTest"); + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReliableProxyTest"); private final ActorSystem system = actorSystemResource.getSystem(); + static//#demo-proxy + public class ProxyParent extends UntypedActor { + private final ActorRef proxy; + + public ProxyParent(ActorRef target) { + proxy = getContext().actorOf( + Props.create(ReliableProxy.class, target, + Duration.create(100, TimeUnit.MILLISECONDS))); + } + + public void onReceive(Object msg) { + if ("hello".equals(msg)) { + proxy.tell("world!", getSelf()); + } + } + } + + //#demo-proxy + + static//#demo-transition + public class ProxyTransitionParent extends UntypedActor { + private final ActorRef proxy; + private ActorRef client = null; + + public ProxyTransitionParent(ActorRef target) { + proxy = getContext().actorOf( + Props.create(ReliableProxy.class, target, + Duration.create(100, TimeUnit.MILLISECONDS))); + proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf()); + } + + public void onReceive(Object msg) { + if ("hello".equals(msg)) { + proxy.tell("world!", getSelf()); + client = getSender(); + } else if (msg instanceof FSM.CurrentState) { + // get initial state + } else if (msg instanceof FSM.Transition) { + @SuppressWarnings("unchecked") + final FSM.Transition transition = + (FSM.Transition) msg; + assert transition.fsmRef().equals(proxy); + if (transition.to().equals(ReliableProxy.idle())) { + client.tell("done", getSelf()); + } + } + } + } + + //#demo-transition + @Test public void demonstrateUsage() { final TestProbe probe = TestProbe.apply(system); final ActorRef target = probe.ref(); - final ActorRef parent = system.actorOf(new Props(new UntypedActorFactory() { - private static final long serialVersionUID = 1L; - - public Actor create() { - return new UntypedActor() { - - //#demo-proxy - final ActorRef proxy = getContext().actorOf( - Props.create(ReliableProxy.class, target, Duration.create(100, "millis"))); - - public void onReceive(Object msg) { - if ("hello".equals(msg)) { - proxy.tell("world!", getSelf()); - } - } - //#demo-proxy - }; - } - })); + final ActorRef parent = system.actorOf(Props.create(ProxyParent.class, target)); parent.tell("hello", null); probe.expectMsg("world!"); } @@ -61,40 +95,7 @@ public class ReliableProxyTest { @Test public void demonstrateTransitions() { final ActorRef target = system.deadLetters(); - final ActorRef parent = system.actorOf(new Props(new UntypedActorFactory() { - private static final long serialVersionUID = 1L; - - public Actor create() { - return new UntypedActor() { - - //#demo-transition - final ActorRef proxy = getContext().actorOf(Props.create(ReliableProxy.class, target, - Duration.create(100, "millis"))); - ActorRef client = null; - { - proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf()); - } - - public void onReceive(Object msg) { - if ("hello".equals(msg)) { - proxy.tell("world!", getSelf()); - client = getSender(); - } else if (msg instanceof FSM.CurrentState) { - // get initial state - } else if (msg instanceof FSM.Transition) { - @SuppressWarnings("unchecked") - final FSM.Transition transition = - (FSM.Transition) msg; - assert transition.fsmRef().equals(proxy); - if (transition.to().equals(ReliableProxy.idle())) { - client.tell("done", getSelf()); - } - } - } - //#demo-transition - }; - } - })); + final ActorRef parent = system.actorOf(Props.create(ProxyTransitionParent.class, target)); final TestProbe probe = TestProbe.apply(system); parent.tell("hello", probe.ref()); probe.expectMsg("done"); diff --git a/akka-contrib/src/test/resources/reference.conf b/akka-contrib/src/test/resources/reference.conf new file mode 100644 index 0000000000..ab48718a51 --- /dev/null +++ b/akka-contrib/src/test/resources/reference.conf @@ -0,0 +1,6 @@ +akka { + actor { + serialize-creators = on + serialize-messages = on + } +} \ No newline at end of file diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala index 6e442114c3..f9c13cfe39 100644 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala @@ -11,28 +11,58 @@ import akka.testkit.ImplicitSender import scala.concurrent.duration._ import akka.actor.FSM import akka.actor.ActorRef +import akka.testkit.TestProbe + +object ReliableProxyDocSpec { + + //#demo + import akka.contrib.pattern.ReliableProxy + + class ProxyParent(target: ActorRef) extends Actor { + val proxy = context.actorOf(Props(classOf[ReliableProxy], target, 100.millis)) + + def receive = { + case "hello" ⇒ proxy ! "world!" + } + } + //#demo + + //#demo-transition + class ProxyTransitionParent(target: ActorRef) extends Actor { + val proxy = context.actorOf(Props(classOf[ReliableProxy], target, 100.millis)) + proxy ! FSM.SubscribeTransitionCallBack(self) + + var client: ActorRef = _ + + def receive = { + case "go" ⇒ + proxy ! 42 + client = sender + case FSM.CurrentState(`proxy`, initial) ⇒ + case FSM.Transition(`proxy`, from, to) ⇒ + if (to == ReliableProxy.Idle) + client ! "done" + } + } + //#demo-transition +} class ReliableProxyDocSpec extends AkkaSpec with ImplicitSender { + import ReliableProxyDocSpec._ + "A ReliableProxy" must { + "show usage" in { + val target = testActor + val a = system.actorOf(Props(classOf[ProxyParent], target)) + a ! "hello" + expectMsg("world!") + } + "show state transitions" in { val target = system.deadLetters - val a = system.actorOf(Props(new Actor { - //#demo-transition - val proxy = context.actorOf(Props(classOf[ReliableProxy], target, 100.millis)) - proxy ! FSM.SubscribeTransitionCallBack(self) - - var client: ActorRef = _ - - def receive = { - case "go" ⇒ proxy ! 42; client = sender - case FSM.CurrentState(`proxy`, initial) ⇒ - case FSM.Transition(`proxy`, from, to) ⇒ if (to == ReliableProxy.Idle) - client ! "done" - } - //#demo-transition - })) + val a = system.actorOf(Props(classOf[ProxyTransitionParent], target)) a ! "go" expectMsg("done") } diff --git a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala index 2024ca595c..86427cc7e3 100644 --- a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala @@ -25,12 +25,26 @@ object TimerBasedThrottlerSpec { case x ⇒ sender ! x } } + + def println(a: Any) = () + + //#demo-code + // A simple actor that prints whatever it receives + class PrintActor extends Actor { + def receive = { + case x ⇒ println(x) + } + } + + //#demo-code } @RunWith(classOf[JUnitRunner]) class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSpec")) with ImplicitSender with WordSpecLike with MustMatchers with BeforeAndAfterAll { + import TimerBasedThrottlerSpec._ + override def afterAll { shutdown(system) } @@ -39,12 +53,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp def println(a: Any) = () "pass the ScalaDoc class documentation example program" in { //#demo-code - // A simple actor that prints whatever it receives - val printer = system.actorOf(Props(new Actor { - def receive = { - case x ⇒ println(x) - } - })) + val printer = system.actorOf(Props[PrintActor]) // The throttler for this example, setting the rate val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second))