Merge pull request #22094 from akka/wip-22093-ClusterClient-ask-patriknw
stop ClusterClient ResponseTunnel after first reply when ask is used, #22093
This commit is contained in:
commit
1eba8656cd
2 changed files with 33 additions and 7 deletions
|
|
@ -812,12 +812,21 @@ object ClusterReceptionist {
|
|||
*/
|
||||
class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging {
|
||||
context.setReceiveTimeout(timeout)
|
||||
|
||||
private val isAsk = {
|
||||
val pathElements = client.path.elements
|
||||
pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$")
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Ping ⇒ // keep alive from client
|
||||
case ReceiveTimeout ⇒
|
||||
log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path)
|
||||
context stop self
|
||||
case msg ⇒ client.tell(msg, Actor.noSender)
|
||||
case msg ⇒
|
||||
client.tell(msg, Actor.noSender)
|
||||
if (isAsk)
|
||||
context stop self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -213,6 +213,24 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
enterBarrier("after-2")
|
||||
}
|
||||
|
||||
"work with ask" in within(10 seconds) {
|
||||
runOn(client) {
|
||||
import akka.pattern.ask
|
||||
import system.dispatcher
|
||||
val c = system.actorOf(ClusterClient.props(
|
||||
ClusterClientSettings(system).withInitialContacts(initialContacts)), "ask-client")
|
||||
implicit val timeout = Timeout(remaining)
|
||||
val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true)
|
||||
Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack")
|
||||
system.stop(c)
|
||||
}
|
||||
runOn(fourth) {
|
||||
expectMsg("hello-request")
|
||||
}
|
||||
|
||||
enterBarrier("after-3")
|
||||
}
|
||||
|
||||
"demonstrate usage" in within(15 seconds) {
|
||||
def host1 = first
|
||||
def host2 = second
|
||||
|
|
@ -261,7 +279,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
|
||||
// strange, barriers fail without this sleep
|
||||
Thread.sleep(1000)
|
||||
enterBarrier("after-3")
|
||||
enterBarrier("after-4")
|
||||
}
|
||||
|
||||
"report events" in within(15 seconds) {
|
||||
|
|
@ -305,7 +323,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
}
|
||||
}
|
||||
|
||||
enterBarrier("after-6")
|
||||
enterBarrier("after-5")
|
||||
}
|
||||
|
||||
"re-establish connection to another receptionist when server is shutdown" in within(30 seconds) {
|
||||
|
|
@ -356,7 +374,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("after-4")
|
||||
enterBarrier("after-6")
|
||||
}
|
||||
|
||||
"re-establish connection to receptionist after partition" in within(30 seconds) {
|
||||
|
|
@ -397,7 +415,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
system.stop(c)
|
||||
}
|
||||
|
||||
enterBarrier("after-5")
|
||||
enterBarrier("after-7")
|
||||
}
|
||||
|
||||
"re-establish connection to receptionist after server restart" in within(30 seconds) {
|
||||
|
|
@ -436,8 +454,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
|||
system.name,
|
||||
ConfigFactory.parseString(
|
||||
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
|
||||
else s"akka.remote.netty.tcp.port=$port"
|
||||
).withFallback(system.settings.config))
|
||||
else s"akka.remote.netty.tcp.port=$port").withFallback(system.settings.config))
|
||||
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
|
||||
ClusterClientReceptionist(sys2).registerService(service2)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue