diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index 17d7da0d17..165666caa9 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -812,12 +812,21 @@ object ClusterReceptionist { */ class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging { context.setReceiveTimeout(timeout) + + private val isAsk = { + val pathElements = client.path.elements + pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$") + } + def receive = { case Ping ⇒ // keep alive from client case ReceiveTimeout ⇒ log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path) context stop self - case msg ⇒ client.tell(msg, Actor.noSender) + case msg ⇒ + client.tell(msg, Actor.noSender) + if (isAsk) + context stop self } } } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index c0d04d110a..4fdca9011e 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -213,6 +213,24 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier("after-2") } + "work with ask" in within(10 seconds) { + runOn(client) { + import akka.pattern.ask + import system.dispatcher + val c = system.actorOf(ClusterClient.props( + ClusterClientSettings(system).withInitialContacts(initialContacts)), "ask-client") + implicit val timeout = Timeout(remaining) + val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true) + Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack") + system.stop(c) + } + runOn(fourth) { + expectMsg("hello-request") + } + + enterBarrier("after-3") + } + "demonstrate usage" in within(15 seconds) { def host1 = first def host2 = second @@ -261,7 +279,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod // strange, barriers fail without this sleep Thread.sleep(1000) - enterBarrier("after-3") + enterBarrier("after-4") } "report events" in within(15 seconds) { @@ -305,7 +323,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod } } - enterBarrier("after-6") + enterBarrier("after-5") } "re-establish connection to another receptionist when server is shutdown" in within(30 seconds) { @@ -356,7 +374,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod } } } - enterBarrier("after-4") + enterBarrier("after-6") } "re-establish connection to receptionist after partition" in within(30 seconds) { @@ -397,7 +415,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod system.stop(c) } - enterBarrier("after-5") + enterBarrier("after-7") } "re-establish connection to receptionist after server restart" in within(30 seconds) { @@ -436,8 +454,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod system.name, ConfigFactory.parseString( if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port" - else s"akka.remote.netty.tcp.port=$port" - ).withFallback(system.settings.config)) + else s"akka.remote.netty.tcp.port=$port").withFallback(system.settings.config)) Cluster(sys2).join(Cluster(sys2).selfAddress) val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2") ClusterClientReceptionist(sys2).registerService(service2)