+con #3551 Buffer messages in cluster client
* Stash when establishing connection to receptionist
This commit is contained in:
parent
beba5d9f76
commit
09e3953c9f
4 changed files with 38 additions and 12 deletions
|
|
@ -50,6 +50,15 @@ The ``sender`` of the response messages, as seen by the client, is preserved
|
|||
as the original sender, so the client can choose to send subsequent messages
|
||||
directly to the actor in the cluster.
|
||||
|
||||
While establishing a connection to a receptionist the ``ClusterClient`` will buffer
|
||||
messages and send them when the connection is established. If the buffer is full
|
||||
the ``ClusterClient`` will throw ``akka.actor.StashOverflowException``, which can be
|
||||
handled in by the supervision strategy of the parent actor. The size of the buffer
|
||||
can be configured by the following ``stash-capacity`` setting of the mailbox that is
|
||||
used by the ``ClusterClient`` actor.
|
||||
|
||||
.. includecode:: @contribSrc@/src/main/resources/reference.conf#cluster-client-mailbox-config
|
||||
|
||||
An Example
|
||||
----------
|
||||
|
||||
|
|
|
|||
|
|
@ -42,3 +42,12 @@ akka.contrib.cluster.receptionist {
|
|||
response-tunnel-receive-timeout = 30s
|
||||
}
|
||||
# //#receptionist-ext-config
|
||||
|
||||
# //#cluster-client-mailbox-config
|
||||
akka.contrib.cluster.client {
|
||||
mailbox {
|
||||
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
|
||||
stash-capacity = 1000
|
||||
}
|
||||
}
|
||||
# //#cluster-client-mailbox-config
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import akka.cluster.Member
|
|||
import akka.cluster.MemberStatus
|
||||
import akka.routing.ConsistentHash
|
||||
import akka.routing.MurmurHash
|
||||
import akka.actor.Stash
|
||||
|
||||
object ClusterClient {
|
||||
|
||||
|
|
@ -37,7 +38,8 @@ object ClusterClient {
|
|||
initialContacts: Set[ActorSelection],
|
||||
pingInterval: FiniteDuration = 3.second,
|
||||
refreshContactsInterval: FiniteDuration = 1.minute): Props =
|
||||
Props(classOf[ClusterClient], initialContacts, pingInterval, refreshContactsInterval)
|
||||
Props(classOf[ClusterClient], initialContacts, pingInterval, refreshContactsInterval).
|
||||
withMailbox("akka.contrib.cluster.client.mailbox")
|
||||
|
||||
/**
|
||||
* Java API: Factory method for `ClusterClient` [[akka.actor.Props]].
|
||||
|
|
@ -112,7 +114,7 @@ class ClusterClient(
|
|||
initialContacts: Set[ActorSelection],
|
||||
pingInterval: FiniteDuration,
|
||||
refreshContactsInterval: FiniteDuration)
|
||||
extends Actor with ActorLogging {
|
||||
extends Actor with Stash with ActorLogging {
|
||||
|
||||
import ClusterClient._
|
||||
import ClusterClient.Internal._
|
||||
|
|
@ -143,12 +145,13 @@ class ClusterClient(
|
|||
case ActorIdentity(_, Some(receptionist)) ⇒
|
||||
context watch receptionist
|
||||
log.info("Connected to [{}]", receptionist.path)
|
||||
unstashAll()
|
||||
context.become(active(receptionist))
|
||||
case ActorIdentity(_, None) ⇒ // ok, use another instead
|
||||
case PingTick ⇒ sendGetContacts()
|
||||
case Pong ⇒
|
||||
case RefreshContactsTick ⇒
|
||||
case msg ⇒ context.system.deadLetters forward msg
|
||||
case msg ⇒ stash()
|
||||
}
|
||||
|
||||
def active(receptionist: ActorRef): Actor.Receive = {
|
||||
|
|
|
|||
|
|
@ -103,11 +103,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
"communicate to actor on any node in cluster" in within(10 seconds) {
|
||||
runOn(client) {
|
||||
val c = system.actorOf(ClusterClient.props(initialContacts))
|
||||
|
||||
awaitAssert {
|
||||
c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
|
||||
expectMsg(1 second, "ack")
|
||||
}
|
||||
c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
|
||||
expectMsg("ack")
|
||||
}
|
||||
runOn(fourth) {
|
||||
expectMsg("hello")
|
||||
|
|
@ -133,6 +130,11 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
}
|
||||
//#server
|
||||
|
||||
runOn(host1, host2, host3) {
|
||||
awaitCount(4)
|
||||
}
|
||||
enterBarrier("services-replicated")
|
||||
|
||||
//#client
|
||||
runOn(client) {
|
||||
val c = system.actorOf(ClusterClient.props(initialContacts))
|
||||
|
|
@ -141,6 +143,11 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
}
|
||||
//#client
|
||||
|
||||
runOn(client) {
|
||||
// note that "hi" was sent to 2 "serviceB"
|
||||
receiveN(3).toSet must be(Set("hello", "hi"))
|
||||
}
|
||||
|
||||
{ //not used, only demo
|
||||
//#initialContacts
|
||||
val initialContacts = Set(
|
||||
|
|
@ -165,10 +172,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
runOn(client) {
|
||||
val c = system.actorOf(ClusterClient.props(initialContacts))
|
||||
|
||||
awaitAssert {
|
||||
c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true)
|
||||
expectMsg(1 second, "ack")
|
||||
}
|
||||
c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true)
|
||||
expectMsg("ack")
|
||||
val lastSenderAddress = lastSender.path.address
|
||||
val receptionistRoleName = roleName(lastSenderAddress) match {
|
||||
case Some(r) ⇒ r
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue