diff --git a/akka-contrib/docs/cluster-client.rst b/akka-contrib/docs/cluster-client.rst index 8bc06edc05..f4a769669b 100644 --- a/akka-contrib/docs/cluster-client.rst +++ b/akka-contrib/docs/cluster-client.rst @@ -50,6 +50,15 @@ The ``sender`` of the response messages, as seen by the client, is preserved as the original sender, so the client can choose to send subsequent messages directly to the actor in the cluster. +While establishing a connection to a receptionist the ``ClusterClient`` will buffer +messages and send them when the connection is established. If the buffer is full +the ``ClusterClient`` will throw ``akka.actor.StashOverflowException``, which can be +handled in by the supervision strategy of the parent actor. The size of the buffer +can be configured by the following ``stash-capacity`` setting of the mailbox that is +used by the ``ClusterClient`` actor. + +.. includecode:: @contribSrc@/src/main/resources/reference.conf#cluster-client-mailbox-config + An Example ---------- diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index fa680512ef..7ccfd8d04b 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -42,3 +42,12 @@ akka.contrib.cluster.receptionist { response-tunnel-receive-timeout = 30s } # //#receptionist-ext-config + +# //#cluster-client-mailbox-config +akka.contrib.cluster.client { + mailbox { + mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" + stash-capacity = 1000 + } +} +# //#cluster-client-mailbox-config diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala index 0c9829b988..b42332f7ab 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala @@ -27,6 +27,7 @@ import akka.cluster.Member import akka.cluster.MemberStatus import akka.routing.ConsistentHash import akka.routing.MurmurHash +import akka.actor.Stash object ClusterClient { @@ -37,7 +38,8 @@ object ClusterClient { initialContacts: Set[ActorSelection], pingInterval: FiniteDuration = 3.second, refreshContactsInterval: FiniteDuration = 1.minute): Props = - Props(classOf[ClusterClient], initialContacts, pingInterval, refreshContactsInterval) + Props(classOf[ClusterClient], initialContacts, pingInterval, refreshContactsInterval). + withMailbox("akka.contrib.cluster.client.mailbox") /** * Java API: Factory method for `ClusterClient` [[akka.actor.Props]]. @@ -112,7 +114,7 @@ class ClusterClient( initialContacts: Set[ActorSelection], pingInterval: FiniteDuration, refreshContactsInterval: FiniteDuration) - extends Actor with ActorLogging { + extends Actor with Stash with ActorLogging { import ClusterClient._ import ClusterClient.Internal._ @@ -143,12 +145,13 @@ class ClusterClient( case ActorIdentity(_, Some(receptionist)) ⇒ context watch receptionist log.info("Connected to [{}]", receptionist.path) + unstashAll() context.become(active(receptionist)) case ActorIdentity(_, None) ⇒ // ok, use another instead case PingTick ⇒ sendGetContacts() case Pong ⇒ case RefreshContactsTick ⇒ - case msg ⇒ context.system.deadLetters forward msg + case msg ⇒ stash() } def active(receptionist: ActorRef): Actor.Receive = { diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala index 242cee629a..d4dfcc77da 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala @@ -103,11 +103,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod "communicate to actor on any node in cluster" in within(10 seconds) { runOn(client) { val c = system.actorOf(ClusterClient.props(initialContacts)) - - awaitAssert { - c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) - expectMsg(1 second, "ack") - } + c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) + expectMsg("ack") } runOn(fourth) { expectMsg("hello") @@ -133,6 +130,11 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod } //#server + runOn(host1, host2, host3) { + awaitCount(4) + } + enterBarrier("services-replicated") + //#client runOn(client) { val c = system.actorOf(ClusterClient.props(initialContacts)) @@ -141,6 +143,11 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod } //#client + runOn(client) { + // note that "hi" was sent to 2 "serviceB" + receiveN(3).toSet must be(Set("hello", "hi")) + } + { //not used, only demo //#initialContacts val initialContacts = Set( @@ -165,10 +172,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod runOn(client) { val c = system.actorOf(ClusterClient.props(initialContacts)) - awaitAssert { - c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true) - expectMsg(1 second, "ack") - } + c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true) + expectMsg("ack") val lastSenderAddress = lastSender.path.address val receptionistRoleName = roleName(lastSenderAddress) match { case Some(r) ⇒ r