stop ClusterClient ResponseTunnel after first reply when ask is used, #22093

This commit is contained in:
Patrik Nordwall 2017-01-03 16:42:31 +01:00
parent d1e255f42f
commit b213a7af20
2 changed files with 33 additions and 7 deletions

View file

@ -812,12 +812,21 @@ object ClusterReceptionist {
*/
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging {
context.setReceiveTimeout(timeout)
private val isAsk = {
val pathElements = client.path.elements
pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$")
}
def receive = {
case Ping // keep alive from client
case ReceiveTimeout
log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path)
context stop self
case msg client.tell(msg, Actor.noSender)
case msg
client.tell(msg, Actor.noSender)
if (isAsk)
context stop self
}
}
}

View file

@ -213,6 +213,24 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier("after-2")
}
"work with ask" in within(10 seconds) {
runOn(client) {
import akka.pattern.ask
import system.dispatcher
val c = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "ask-client")
implicit val timeout = Timeout(remaining)
val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true)
Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack")
system.stop(c)
}
runOn(fourth) {
expectMsg("hello-request")
}
enterBarrier("after-3")
}
"demonstrate usage" in within(15 seconds) {
def host1 = first
def host2 = second
@ -261,7 +279,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
// strange, barriers fail without this sleep
Thread.sleep(1000)
enterBarrier("after-3")
enterBarrier("after-4")
}
"report events" in within(15 seconds) {
@ -305,7 +323,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
}
}
enterBarrier("after-6")
enterBarrier("after-5")
}
"re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
@ -356,7 +374,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
}
}
}
enterBarrier("after-4")
enterBarrier("after-6")
}
"re-establish connection to receptionist after partition" in within(30 seconds) {
@ -397,7 +415,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
system.stop(c)
}
enterBarrier("after-5")
enterBarrier("after-7")
}
"re-establish connection to receptionist after server restart" in within(30 seconds) {
@ -436,8 +454,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
system.name,
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
else s"akka.remote.netty.tcp.port=$port"
).withFallback(system.settings.config))
else s"akka.remote.netty.tcp.port=$port").withFallback(system.settings.config))
Cluster(sys2).join(Cluster(sys2).selfAddress)
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterClientReceptionist(sys2).registerService(service2)