Merge pull request #1778 from akka/wip-3669-routing-in-pubsub-patriknw

+con #3669 Possibility to define routing logic in DistributedPubSub
This commit is contained in:
Patrik Nordwall 2013-10-17 05:09:59 -07:00
commit beff53f0a6
10 changed files with 184 additions and 112 deletions

View file

@ -8,6 +8,7 @@ import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.InternalActorRef
import akka.japi.Util.immutableSeq
import akka.actor.NoSerializationVerificationNeeded
/**
* The interface of the routing logic that is used in a [[Router]] to select
@ -15,7 +16,7 @@ import akka.japi.Util.immutableSeq
*
* The implementation must be thread safe.
*/
trait RoutingLogic {
trait RoutingLogic extends NoSerializationVerificationNeeded {
/**
* Pick the destination for a given message. Normally it picks one of the
* passed `routees`, but in the end it is up to the implementation to

View file

@ -28,13 +28,14 @@ any other node. There is three modes of message delivery.
**1. DistributedPubSubMediator.Send**
The message will be delivered to one recipient with a matching path, if any such
exists in the registry. If several entries match the path the message will be delivered
to one random destination. The sender of the message can specify that local
affinity is preferred, i.e. the message is sent to an actor in the same local actor
system as the used mediator actor, if any such exists, otherwise random to any other
matching entry. A typical usage of this mode is private chat to one other user in
an instant messaging application. It can also be used for distributing tasks to workers,
like a random router.
exists in the registry. If several entries match the path the message will be sent
via the supplied ``RoutingLogic`` (default random) to one destination. The sender of the
message can specify that local affinity is preferred, i.e. the message is sent to an actor
in the same local actor system as the used mediator actor, if any such exists, otherwise
route to any other matching entry. A typical usage of this mode is private chat to one
other user in an instant messaging application. It can also be used for distributing
tasks to registered workers, like a cluster aware router where the routees dynamically
can register themselves.
**2. DistributedPubSubMediator.SendToAll**

View file

@ -71,12 +71,12 @@ Since this implementation does not offer much in the way of configuration,
simply instantiate a proxy wrapping some target reference. From Java it looks
like this:
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ReliableProxyTest.java#import
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ReliableProxyTest.java#demo-proxy
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ReliableProxyTest.java
:include: import,demo-proxy
And from Scala like this:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala#demo
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala#demo
Since the :class:`ReliableProxy` actor is an :ref:`fsm-scala`, it also offers
the capability to subscribe to state transitions. If you need to know when all

View file

@ -14,6 +14,10 @@ akka.contrib.cluster.pub-sub {
# Start the mediator on members tagged with this role.
# All members are used if undefined or empty.
role = ""
# The routing logic to use for 'Send'
# Possible values: random, round-robin, consistent-hashing, broadcast
routing-logic = random
# How often the DistributedPubSubMediator should send out gossip information
gossip-interval = 1s

View file

@ -8,7 +8,6 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import java.net.URLEncoder
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorPath
@ -25,6 +24,14 @@ import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.routing.RandomRoutingLogic
import akka.routing.RoutingLogic
import akka.routing.Routee
import akka.routing.ActorRefRoutee
import akka.routing.Router
import akka.routing.RoundRobinRoutingLogic
import akka.routing.ConsistentHashingRoutingLogic
import akka.routing.BroadcastRoutingLogic
object DistributedPubSubMediator {
@ -33,18 +40,20 @@ object DistributedPubSubMediator {
*/
def props(
role: Option[String],
routingLogic: RoutingLogic = RandomRoutingLogic(),
gossipInterval: FiniteDuration = 1.second,
removedTimeToLive: FiniteDuration = 2.minutes): Props =
Props(classOf[DistributedPubSubMediator], role, gossipInterval, removedTimeToLive)
Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive)
/**
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]].
*/
def props(
role: String,
routingLogic: RoutingLogic,
gossipInterval: FiniteDuration,
removedTimeToLive: FiniteDuration): Props =
props(Internal.roleOption(role), gossipInterval, removedTimeToLive)
props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive)
/**
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]
@ -59,7 +68,12 @@ object DistributedPubSubMediator {
@SerialVersionUID(1L) case class SubscribeAck(subscribe: Subscribe)
@SerialVersionUID(1L) case class UnsubscribeAck(unsubscribe: Unsubscribe)
@SerialVersionUID(1L) case class Publish(topic: String, msg: Any)
@SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean)
@SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean) {
/**
* Convenience constructor with `localAffinity` false
*/
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}
@SerialVersionUID(1L) case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false) {
def this(path: String, msg: Any) = this(path, msg, allButSelf = false)
}
@ -80,7 +94,9 @@ object DistributedPubSubMediator {
content: Map[String, ValueHolder])
@SerialVersionUID(1L)
case class ValueHolder(version: Long, ref: Option[ActorRef])
case class ValueHolder(version: Long, ref: Option[ActorRef]) {
@transient lazy val routee: Option[Routee] = ref map ActorRefRoutee
}
@SerialVersionUID(1L)
case class Status(versions: Map[Address, Long])
@ -156,13 +172,14 @@ object DistributedPubSubMediator {
*
* 1. [[DistributedPubSubMediator.Send]] -
* The message will be delivered to one recipient with a matching path, if any such
* exists in the registry. If several entries match the path the message will be delivered
* to one random destination. The sender of the message can specify that local
* affinity is preferred, i.e. the message is sent to an actor in the same local actor
* system as the used mediator actor, if any such exists, otherwise random to any other
* matching entry. A typical usage of this mode is private chat to one other user in
* an instant messaging application. It can also be used for distributing tasks to workers,
* like a random router.
* exists in the registry. If several entries match the path the message will be sent
* via the supplied `routingLogic` (default random) to one destination. The sender of the
* message can specify that local affinity is preferred, i.e. the message is sent to an actor
* in the same local actor system as the used mediator actor, if any such exists, otherwise
* route to any other matching entry. A typical usage of this mode is private chat to one
* other user in an instant messaging application. It can also be used for distributing
* tasks to registered workers, like a cluster aware router where the routees dynamically
* can register themselves.
*
* 2. [[DistributedPubSubMediator.SendToAll]] -
* The message will be delivered to all recipients with a matching path. Actors with
@ -194,6 +211,7 @@ object DistributedPubSubMediator {
*/
class DistributedPubSubMediator(
role: Option[String],
routingLogic: RoutingLogic,
gossipInterval: FiniteDuration,
removedTimeToLive: FiniteDuration)
extends Actor with ActorLogging {
@ -250,14 +268,14 @@ class DistributedPubSubMediator(
case Some(ValueHolder(_, Some(ref))) if localAffinity
ref forward msg
case _
val refs = (for {
val routees = (for {
(_, bucket) registry
valueHolder bucket.content.get(path)
ref valueHolder.ref
} yield ref).toVector
routee valueHolder.routee
} yield routee).toVector
if (refs.nonEmpty)
refs(ThreadLocalRandom.current.nextInt(refs.size)) forward msg
if (routees.nonEmpty)
Router(routingLogic, routees).route(msg, sender)
}
case SendToAll(path, msg, skipSenderNode)
@ -459,10 +477,16 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension
if (isTerminated)
system.deadLetters
else {
val routingLogic = config.getString("routing-logic") match {
case "random" RandomRoutingLogic()
case "round-robin" RoundRobinRoutingLogic()
case "consistent-hashing" ConsistentHashingRoutingLogic(system)
case "broadcast" BroadcastRoutingLogic()
}
val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS)
val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS)
val name = config.getString("name")
system.actorOf(DistributedPubSubMediator.props(role, gossipInterval, removedTimeToLive),
system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive),
name)
}
}

View file

@ -67,18 +67,14 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
enterBarrier("initialize")
runOn(local) {
//#demo
import akka.contrib.pattern.ReliableProxy
system.actorSelection(node(remote) / "user" / "echo") ! Identify("echo")
target = expectMsgType[ActorIdentity].ref.get
proxy = system.actorOf(Props(classOf[ReliableProxy], target, 100.millis), "proxy")
//#demo
proxy ! FSM.SubscribeTransitionCallBack(testActor)
expectState(Idle)
//#demo
proxy ! "hello"
//#demo
expectTransition(Idle, Active)
expectTransition(Active, Idle)
}

View file

@ -4,56 +4,90 @@
package akka.contrib.pattern;
import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.FSM;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.testkit.TestProbe;
//#import
import akka.contrib.pattern.ReliableProxy;
//#import
public class ReliableProxyTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ReliableProxyTest");
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReliableProxyTest");
private final ActorSystem system = actorSystemResource.getSystem();
static//#demo-proxy
public class ProxyParent extends UntypedActor {
private final ActorRef proxy;
public ProxyParent(ActorRef target) {
proxy = getContext().actorOf(
Props.create(ReliableProxy.class, target,
Duration.create(100, TimeUnit.MILLISECONDS)));
}
public void onReceive(Object msg) {
if ("hello".equals(msg)) {
proxy.tell("world!", getSelf());
}
}
}
//#demo-proxy
static//#demo-transition
public class ProxyTransitionParent extends UntypedActor {
private final ActorRef proxy;
private ActorRef client = null;
public ProxyTransitionParent(ActorRef target) {
proxy = getContext().actorOf(
Props.create(ReliableProxy.class, target,
Duration.create(100, TimeUnit.MILLISECONDS)));
proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf());
}
public void onReceive(Object msg) {
if ("hello".equals(msg)) {
proxy.tell("world!", getSelf());
client = getSender();
} else if (msg instanceof FSM.CurrentState<?>) {
// get initial state
} else if (msg instanceof FSM.Transition<?>) {
@SuppressWarnings("unchecked")
final FSM.Transition<ReliableProxy.State> transition =
(FSM.Transition<ReliableProxy.State>) msg;
assert transition.fsmRef().equals(proxy);
if (transition.to().equals(ReliableProxy.idle())) {
client.tell("done", getSelf());
}
}
}
}
//#demo-transition
@Test
public void demonstrateUsage() {
final TestProbe probe = TestProbe.apply(system);
final ActorRef target = probe.ref();
final ActorRef parent = system.actorOf(new Props(new UntypedActorFactory() {
private static final long serialVersionUID = 1L;
public Actor create() {
return new UntypedActor() {
//#demo-proxy
final ActorRef proxy = getContext().actorOf(
Props.create(ReliableProxy.class, target, Duration.create(100, "millis")));
public void onReceive(Object msg) {
if ("hello".equals(msg)) {
proxy.tell("world!", getSelf());
}
}
//#demo-proxy
};
}
}));
final ActorRef parent = system.actorOf(Props.create(ProxyParent.class, target));
parent.tell("hello", null);
probe.expectMsg("world!");
}
@ -61,40 +95,7 @@ public class ReliableProxyTest {
@Test
public void demonstrateTransitions() {
final ActorRef target = system.deadLetters();
final ActorRef parent = system.actorOf(new Props(new UntypedActorFactory() {
private static final long serialVersionUID = 1L;
public Actor create() {
return new UntypedActor() {
//#demo-transition
final ActorRef proxy = getContext().actorOf(Props.create(ReliableProxy.class, target,
Duration.create(100, "millis")));
ActorRef client = null;
{
proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf());
}
public void onReceive(Object msg) {
if ("hello".equals(msg)) {
proxy.tell("world!", getSelf());
client = getSender();
} else if (msg instanceof FSM.CurrentState<?>) {
// get initial state
} else if (msg instanceof FSM.Transition<?>) {
@SuppressWarnings("unchecked")
final FSM.Transition<ReliableProxy.State> transition =
(FSM.Transition<ReliableProxy.State>) msg;
assert transition.fsmRef().equals(proxy);
if (transition.to().equals(ReliableProxy.idle())) {
client.tell("done", getSelf());
}
}
}
//#demo-transition
};
}
}));
final ActorRef parent = system.actorOf(Props.create(ProxyTransitionParent.class, target));
final TestProbe probe = TestProbe.apply(system);
parent.tell("hello", probe.ref());
probe.expectMsg("done");

View file

@ -0,0 +1,6 @@
akka {
actor {
serialize-creators = on
serialize-messages = on
}
}

View file

@ -11,28 +11,58 @@ import akka.testkit.ImplicitSender
import scala.concurrent.duration._
import akka.actor.FSM
import akka.actor.ActorRef
import akka.testkit.TestProbe
object ReliableProxyDocSpec {
//#demo
import akka.contrib.pattern.ReliableProxy
class ProxyParent(target: ActorRef) extends Actor {
val proxy = context.actorOf(Props(classOf[ReliableProxy], target, 100.millis))
def receive = {
case "hello" proxy ! "world!"
}
}
//#demo
//#demo-transition
class ProxyTransitionParent(target: ActorRef) extends Actor {
val proxy = context.actorOf(Props(classOf[ReliableProxy], target, 100.millis))
proxy ! FSM.SubscribeTransitionCallBack(self)
var client: ActorRef = _
def receive = {
case "go"
proxy ! 42
client = sender
case FSM.CurrentState(`proxy`, initial)
case FSM.Transition(`proxy`, from, to)
if (to == ReliableProxy.Idle)
client ! "done"
}
}
//#demo-transition
}
class ReliableProxyDocSpec extends AkkaSpec with ImplicitSender {
import ReliableProxyDocSpec._
"A ReliableProxy" must {
"show usage" in {
val target = testActor
val a = system.actorOf(Props(classOf[ProxyParent], target))
a ! "hello"
expectMsg("world!")
}
"show state transitions" in {
val target = system.deadLetters
val a = system.actorOf(Props(new Actor {
//#demo-transition
val proxy = context.actorOf(Props(classOf[ReliableProxy], target, 100.millis))
proxy ! FSM.SubscribeTransitionCallBack(self)
var client: ActorRef = _
def receive = {
case "go" proxy ! 42; client = sender
case FSM.CurrentState(`proxy`, initial)
case FSM.Transition(`proxy`, from, to) if (to == ReliableProxy.Idle)
client ! "done"
}
//#demo-transition
}))
val a = system.actorOf(Props(classOf[ProxyTransitionParent], target))
a ! "go"
expectMsg("done")
}

View file

@ -25,12 +25,26 @@ object TimerBasedThrottlerSpec {
case x sender ! x
}
}
def println(a: Any) = ()
//#demo-code
// A simple actor that prints whatever it receives
class PrintActor extends Actor {
def receive = {
case x println(x)
}
}
//#demo-code
}
@RunWith(classOf[JUnitRunner])
class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSpec")) with ImplicitSender
with WordSpecLike with MustMatchers with BeforeAndAfterAll {
import TimerBasedThrottlerSpec._
override def afterAll {
shutdown(system)
}
@ -39,12 +53,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp
def println(a: Any) = ()
"pass the ScalaDoc class documentation example program" in {
//#demo-code
// A simple actor that prints whatever it receives
val printer = system.actorOf(Props(new Actor {
def receive = {
case x println(x)
}
}))
val printer = system.actorOf(Props[PrintActor])
// The throttler for this example, setting the rate
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler],
3 msgsPer 1.second))