=rem #16224: Not terminate connections if quarantine id do not match
This commit is contained in:
parent
7aa9fe25f2
commit
a6e10f154f
5 changed files with 403 additions and 28 deletions
|
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
import akka.actor.Actor
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.actor.Address
|
||||||
|
import akka.actor.Deploy
|
||||||
|
import akka.actor.Props
|
||||||
|
import akka.actor.RootActorPath
|
||||||
|
import akka.cluster.ClusterEvent._
|
||||||
|
import akka.cluster.MemberStatus._
|
||||||
|
import akka.remote.AddressUidExtension
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.testkit._
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
object RestartNode2SpecMultiJvmSpec extends MultiNodeConfig {
|
||||||
|
val seed1 = role("seed1")
|
||||||
|
val seed2 = role("seed2")
|
||||||
|
|
||||||
|
commonConfig(debugConfig(on = false).
|
||||||
|
withFallback(ConfigFactory.parseString("""
|
||||||
|
akka.cluster.auto-down-unreachable-after = 2s
|
||||||
|
akka.cluster.retry-unsuccessful-join-after = 3s
|
||||||
|
akka.remote.retry-gate-closed-for = 45s
|
||||||
|
akka.remote.log-remote-lifecycle-events = INFO
|
||||||
|
""")).
|
||||||
|
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class RestartNode2SpecMultiJvmNode1 extends RestartNode2SpecSpec
|
||||||
|
class RestartNode2SpecMultiJvmNode2 extends RestartNode2SpecSpec
|
||||||
|
|
||||||
|
abstract class RestartNode2SpecSpec
|
||||||
|
extends MultiNodeSpec(RestartNode2SpecMultiJvmSpec)
|
||||||
|
with MultiNodeClusterSpec with ImplicitSender {
|
||||||
|
|
||||||
|
import RestartNode2SpecMultiJvmSpec._
|
||||||
|
|
||||||
|
@volatile var seedNode1Address: Address = _
|
||||||
|
|
||||||
|
// use a separate ActorSystem, to be able to simulate restart
|
||||||
|
lazy val seed1System = ActorSystem(system.name, system.settings.config)
|
||||||
|
|
||||||
|
def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2)
|
||||||
|
|
||||||
|
// this is the node that will attempt to re-join, keep gate times low so it can retry quickly
|
||||||
|
lazy val restartedSeed1System = ActorSystem(system.name,
|
||||||
|
ConfigFactory.parseString(
|
||||||
|
s"""
|
||||||
|
akka.remote.netty.tcp.port= ${seedNodes.head.port.get}
|
||||||
|
#akka.remote.retry-gate-closed-for = 1s
|
||||||
|
""").
|
||||||
|
withFallback(system.settings.config))
|
||||||
|
|
||||||
|
override def afterAll(): Unit = {
|
||||||
|
runOn(seed1) {
|
||||||
|
shutdown(
|
||||||
|
if (seed1System.whenTerminated.isCompleted) restartedSeed1System else seed1System)
|
||||||
|
}
|
||||||
|
super.afterAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
"Cluster seed nodes" must {
|
||||||
|
"be able to restart first seed node and join other seed nodes" taggedAs LongRunningTest in within(60.seconds) {
|
||||||
|
// seed1System is a separate ActorSystem, to be able to simulate restart
|
||||||
|
// we must transfer its address to seed2
|
||||||
|
runOn(seed2) {
|
||||||
|
system.actorOf(Props(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case a: Address ⇒
|
||||||
|
seedNode1Address = a
|
||||||
|
sender() ! "ok"
|
||||||
|
}
|
||||||
|
}).withDeploy(Deploy.local), name = "address-receiver")
|
||||||
|
enterBarrier("seed1-address-receiver-ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(seed1) {
|
||||||
|
enterBarrier("seed1-address-receiver-ready")
|
||||||
|
seedNode1Address = Cluster(seed1System).selfAddress
|
||||||
|
List(seed2) foreach { r ⇒
|
||||||
|
system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! seedNode1Address
|
||||||
|
expectMsg(5.seconds, "ok")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enterBarrier("seed1-address-transfered")
|
||||||
|
|
||||||
|
// now we can join seed1System, seed2 together
|
||||||
|
|
||||||
|
runOn(seed1) {
|
||||||
|
Cluster(seed1System).joinSeedNodes(seedNodes)
|
||||||
|
awaitAssert(Cluster(seed1System).readView.members.size should be(2))
|
||||||
|
awaitAssert(Cluster(seed1System).readView.members.map(_.status) should be(Set(Up)))
|
||||||
|
}
|
||||||
|
runOn(seed2) {
|
||||||
|
cluster.joinSeedNodes(seedNodes)
|
||||||
|
awaitMembersUp(2)
|
||||||
|
}
|
||||||
|
enterBarrier("started")
|
||||||
|
|
||||||
|
// shutdown seed1System
|
||||||
|
runOn(seed1) {
|
||||||
|
shutdown(seed1System, remainingOrDefault)
|
||||||
|
}
|
||||||
|
enterBarrier("seed1-shutdown")
|
||||||
|
|
||||||
|
// then start restartedSeed1System, which has the same address as seed1System
|
||||||
|
runOn(seed1) {
|
||||||
|
Cluster(restartedSeed1System).joinSeedNodes(seedNodes)
|
||||||
|
within(30.seconds) {
|
||||||
|
awaitAssert(Cluster(restartedSeed1System).readView.members.size should be(2))
|
||||||
|
awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) should be(Set(Up)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runOn(seed2) {
|
||||||
|
awaitMembersUp(2)
|
||||||
|
}
|
||||||
|
enterBarrier("seed1-restarted")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,108 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.remote
|
||||||
|
|
||||||
|
import akka.remote.transport.AssociationHandle
|
||||||
|
|
||||||
|
import language.postfixOps
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import akka.actor._
|
||||||
|
import akka.remote.testconductor.RoleName
|
||||||
|
import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociateExplicitly, ForceDisassociate, Direction }
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.remote.testkit.STMultiNodeSpec
|
||||||
|
import akka.testkit._
|
||||||
|
import akka.actor.ActorIdentity
|
||||||
|
import akka.remote.testconductor.RoleName
|
||||||
|
import akka.actor.Identify
|
||||||
|
import scala.concurrent.Await
|
||||||
|
|
||||||
|
object RemoteGatePiercingSpec extends MultiNodeConfig {
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
|
||||||
|
commonConfig(debugConfig(on = false).withFallback(
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.remote.log-remote-lifecycle-events = INFO
|
||||||
|
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 5 s
|
||||||
|
""")))
|
||||||
|
|
||||||
|
nodeConfig(first)(
|
||||||
|
ConfigFactory.parseString("akka.remote.retry-gate-closed-for = 1 d # Keep it long"))
|
||||||
|
|
||||||
|
nodeConfig(second)(
|
||||||
|
ConfigFactory.parseString("akka.remote.retry-gate-closed-for = 1 s # Keep it short"))
|
||||||
|
|
||||||
|
testTransport(on = true)
|
||||||
|
|
||||||
|
class Subject extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case "shutdown" ⇒ context.system.terminate()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteGatePiercingSpecMultiJvmNode1 extends RemoteGatePiercingSpec
|
||||||
|
class RemoteGatePiercingSpecMultiJvmNode2 extends RemoteGatePiercingSpec
|
||||||
|
|
||||||
|
abstract class RemoteGatePiercingSpec
|
||||||
|
extends MultiNodeSpec(RemoteGatePiercingSpec)
|
||||||
|
with STMultiNodeSpec with ImplicitSender {
|
||||||
|
|
||||||
|
import RemoteGatePiercingSpec._
|
||||||
|
|
||||||
|
override def initialParticipants = 2
|
||||||
|
|
||||||
|
def identify(role: RoleName, actorName: String): ActorRef = {
|
||||||
|
system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName)
|
||||||
|
expectMsgType[ActorIdentity].ref.get
|
||||||
|
}
|
||||||
|
|
||||||
|
"RemoteNodeRestartGate" must {
|
||||||
|
|
||||||
|
"allow restarted node to pass through gate" taggedAs LongRunningTest in {
|
||||||
|
runOn(first) {
|
||||||
|
system.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
identify(second, "subject")
|
||||||
|
|
||||||
|
enterBarrier("actors-communicate")
|
||||||
|
|
||||||
|
EventFilter.warning(pattern = "address is now gated", occurrences = 1).intercept {
|
||||||
|
Await.result(RARP(system).provider.transport.managementCommand(
|
||||||
|
ForceDisassociateExplicitly(node(second).address, AssociationHandle.Unknown)), 3.seconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("gated")
|
||||||
|
|
||||||
|
enterBarrier("gate-pierced")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(second) {
|
||||||
|
system.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
enterBarrier("actors-communicate")
|
||||||
|
|
||||||
|
enterBarrier("gated")
|
||||||
|
|
||||||
|
// Pierce the gate
|
||||||
|
within(30.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
identify(first, "subject")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("gate-pierced")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,126 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.remote
|
||||||
|
|
||||||
|
import akka.remote.transport.AssociationHandle
|
||||||
|
|
||||||
|
import language.postfixOps
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import akka.actor._
|
||||||
|
import akka.remote.testconductor.RoleName
|
||||||
|
import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociateExplicitly, ForceDisassociate, Direction }
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.remote.testkit.STMultiNodeSpec
|
||||||
|
import akka.testkit._
|
||||||
|
import akka.actor.ActorIdentity
|
||||||
|
import akka.remote.testconductor.RoleName
|
||||||
|
import akka.actor.Identify
|
||||||
|
import scala.concurrent.Await
|
||||||
|
|
||||||
|
object RemoteNodeRestartGateSpec extends MultiNodeConfig {
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
|
||||||
|
commonConfig(debugConfig(on = false).withFallback(
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.remote.log-remote-lifecycle-events = INFO
|
||||||
|
akka.remote.retry-gate-closed-for = 1d # Keep it long
|
||||||
|
""")))
|
||||||
|
|
||||||
|
testTransport(on = true)
|
||||||
|
|
||||||
|
class Subject extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case "shutdown" ⇒ context.system.terminate()
|
||||||
|
case msg ⇒ sender() ! msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteNodeRestartGateSpecMultiJvmNode1 extends RemoteNodeRestartGateSpec
|
||||||
|
class RemoteNodeRestartGateSpecMultiJvmNode2 extends RemoteNodeRestartGateSpec
|
||||||
|
|
||||||
|
abstract class RemoteNodeRestartGateSpec
|
||||||
|
extends MultiNodeSpec(RemoteNodeRestartGateSpec)
|
||||||
|
with STMultiNodeSpec with ImplicitSender {
|
||||||
|
|
||||||
|
import RemoteNodeRestartGateSpec._
|
||||||
|
|
||||||
|
override def initialParticipants = 2
|
||||||
|
|
||||||
|
def identify(role: RoleName, actorName: String): ActorRef = {
|
||||||
|
system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName)
|
||||||
|
expectMsgType[ActorIdentity].ref.get
|
||||||
|
}
|
||||||
|
|
||||||
|
"RemoteNodeRestartGate" must {
|
||||||
|
|
||||||
|
"allow restarted node to pass through gate" taggedAs LongRunningTest in {
|
||||||
|
runOn(first) {
|
||||||
|
val secondAddress = node(second).address
|
||||||
|
system.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
identify(second, "subject")
|
||||||
|
|
||||||
|
EventFilter.warning(pattern = "address is now gated", occurrences = 1).intercept {
|
||||||
|
Await.result(RARP(system).provider.transport.managementCommand(
|
||||||
|
ForceDisassociateExplicitly(node(second).address, AssociationHandle.Unknown)), 3.seconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("gated")
|
||||||
|
|
||||||
|
testConductor.shutdown(second).await
|
||||||
|
|
||||||
|
within(10.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject")
|
||||||
|
expectMsgType[ActorIdentity].ref.get
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "shutdown"
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(second) {
|
||||||
|
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
|
val firstAddress = node(first).address
|
||||||
|
|
||||||
|
system.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
enterBarrier("gated")
|
||||||
|
|
||||||
|
Await.ready(system.whenTerminated, 10.seconds)
|
||||||
|
|
||||||
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
|
akka.remote.retry-gate-closed-for = 0.5 s
|
||||||
|
akka.remote.netty.tcp {
|
||||||
|
hostname = ${addr.host.get}
|
||||||
|
port = ${addr.port.get}
|
||||||
|
}
|
||||||
|
""").withFallback(system.settings.config))
|
||||||
|
|
||||||
|
val probe = TestProbe()(freshSystem)
|
||||||
|
|
||||||
|
// Pierce the gate
|
||||||
|
within(30.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
freshSystem.actorSelection(RootActorPath(firstAddress) / "user" / "subject").tell(Identify("subject"), probe.ref)
|
||||||
|
probe.expectMsgType[ActorIdentity].ref.get
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now the other system will be able to pass, too
|
||||||
|
freshSystem.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
|
Await.ready(freshSystem.whenTerminated, 30.seconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -293,7 +293,7 @@ private[remote] object EndpointManager {
|
||||||
class EndpointRegistry {
|
class EndpointRegistry {
|
||||||
private var addressToWritable = HashMap[Address, EndpointPolicy]()
|
private var addressToWritable = HashMap[Address, EndpointPolicy]()
|
||||||
private var writableToAddress = HashMap[ActorRef, Address]()
|
private var writableToAddress = HashMap[ActorRef, Address]()
|
||||||
private var addressToReadonly = HashMap[Address, ActorRef]()
|
private var addressToReadonly = HashMap[Address, (ActorRef, Int)]()
|
||||||
private var readonlyToAddress = HashMap[ActorRef, Address]()
|
private var readonlyToAddress = HashMap[ActorRef, Address]()
|
||||||
|
|
||||||
def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef =
|
def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef =
|
||||||
|
|
@ -313,8 +313,8 @@ private[remote] object EndpointManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = {
|
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef, uid: Int): ActorRef = {
|
||||||
addressToReadonly += address -> endpoint
|
addressToReadonly += address -> ((endpoint, uid))
|
||||||
readonlyToAddress += endpoint -> address
|
readonlyToAddress += endpoint -> address
|
||||||
endpoint
|
endpoint
|
||||||
}
|
}
|
||||||
|
|
@ -339,7 +339,7 @@ private[remote] object EndpointManager {
|
||||||
case _ ⇒ false
|
case _ ⇒ false
|
||||||
}
|
}
|
||||||
|
|
||||||
def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address)
|
def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address)
|
||||||
|
|
||||||
def isWritable(endpoint: ActorRef): Boolean = writableToAddress contains endpoint
|
def isWritable(endpoint: ActorRef): Boolean = writableToAddress contains endpoint
|
||||||
|
|
||||||
|
|
@ -533,25 +533,30 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
}
|
}
|
||||||
Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender()
|
Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender()
|
||||||
|
|
||||||
case Quarantine(address, uidOption) ⇒
|
case Quarantine(address, uidToQuarantineOption) ⇒
|
||||||
// Stop writers
|
// Stop writers
|
||||||
endpoints.writableEndpointWithPolicyFor(address) match {
|
(endpoints.writableEndpointWithPolicyFor(address), uidToQuarantineOption) match {
|
||||||
case Some(Pass(endpoint, _, _)) ⇒
|
case (Some(Pass(endpoint, _, _)), None) ⇒
|
||||||
context.stop(endpoint)
|
context.stop(endpoint)
|
||||||
if (uidOption.isEmpty) {
|
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
|
||||||
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
|
"address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||||
"address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
address, settings.RetryGateClosedFor.toMillis)
|
||||||
address, settings.RetryGateClosedFor.toMillis)
|
endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
|
||||||
endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
|
case (Some(Pass(endpoint, Some(currentUid), _)), Some(quarantineUid)) if currentUid == quarantineUid ⇒
|
||||||
}
|
context.stop(endpoint)
|
||||||
|
case _ ⇒
|
||||||
|
// Do nothing, because either:
|
||||||
|
// A: we don't know yet the UID of the writer, it will be checked against current quarantine state later
|
||||||
|
// B: we know the UID, but it does not match with the UID to be quarantined
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop inbound read-only associations
|
||||||
|
(endpoints.readOnlyEndpointFor(address), uidToQuarantineOption) match {
|
||||||
|
case (Some((endpoint, _)), None) ⇒ context.stop(endpoint)
|
||||||
|
case (Some((endpoint, currentUid)), Some(quarantineUid)) if currentUid == quarantineUid ⇒ context.stop(endpoint)
|
||||||
case _ ⇒ // nothing to stop
|
case _ ⇒ // nothing to stop
|
||||||
}
|
}
|
||||||
// Stop inbound read-only associations
|
uidToQuarantineOption foreach { uid ⇒
|
||||||
endpoints.readOnlyEndpointFor(address) match {
|
|
||||||
case Some(endpoint) ⇒ context.stop(endpoint)
|
|
||||||
case _ ⇒ // nothing to stop
|
|
||||||
}
|
|
||||||
uidOption foreach { uid ⇒
|
|
||||||
endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration)
|
endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration)
|
||||||
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
|
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
|
||||||
}
|
}
|
||||||
|
|
@ -633,10 +638,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
|
|
||||||
def handleInboundAssociation(ia: InboundAssociation): Unit = ia match {
|
def handleInboundAssociation(ia: InboundAssociation): Unit = ia match {
|
||||||
case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
|
case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
|
||||||
case Some(endpoint) ⇒
|
case Some((endpoint, _)) ⇒
|
||||||
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
|
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
|
||||||
pendingReadHandoffs += endpoint -> handle
|
pendingReadHandoffs += endpoint -> handle
|
||||||
endpoint ! EndpointWriter.TakeOver(handle, self)
|
endpoint ! EndpointWriter.TakeOver(handle, self)
|
||||||
|
endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
|
||||||
|
case Some(Pass(ep, _, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
case None ⇒
|
case None ⇒
|
||||||
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
|
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
|
||||||
handle.disassociate(AssociationHandle.Quarantined)
|
handle.disassociate(AssociationHandle.Quarantined)
|
||||||
|
|
@ -648,6 +657,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
pendingReadHandoffs.get(ep) foreach (_.disassociate())
|
pendingReadHandoffs.get(ep) foreach (_.disassociate())
|
||||||
pendingReadHandoffs += ep -> handle
|
pendingReadHandoffs += ep -> handle
|
||||||
ep ! EndpointWriter.StopReading(ep, self)
|
ep ! EndpointWriter.StopReading(ep, self)
|
||||||
|
ep ! ReliableDeliverySupervisor.Ungate
|
||||||
} else {
|
} else {
|
||||||
context.stop(ep)
|
context.stop(ep)
|
||||||
endpoints.unregisterEndpoint(ep)
|
endpoints.unregisterEndpoint(ep)
|
||||||
|
|
@ -674,7 +684,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
if (writing)
|
if (writing)
|
||||||
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint)
|
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint)
|
||||||
else {
|
else {
|
||||||
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
|
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
|
||||||
if (!endpoints.hasWritableEndpointFor(handle.remoteAddress))
|
if (!endpoints.hasWritableEndpointFor(handle.remoteAddress))
|
||||||
endpoints.removePolicy(handle.remoteAddress)
|
endpoints.removePolicy(handle.remoteAddress)
|
||||||
}
|
}
|
||||||
|
|
@ -740,7 +750,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
Some(handle),
|
Some(handle),
|
||||||
writing = false,
|
writing = false,
|
||||||
refuseUid = None)
|
refuseUid = None)
|
||||||
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
|
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,10 +34,10 @@ class EndpointRegistrySpec extends AkkaSpec {
|
||||||
val reg = new EndpointRegistry
|
val reg = new EndpointRegistry
|
||||||
reg.readOnlyEndpointFor(address1) should ===(None)
|
reg.readOnlyEndpointFor(address1) should ===(None)
|
||||||
|
|
||||||
reg.registerReadOnlyEndpoint(address1, actorA) should ===(actorA)
|
reg.registerReadOnlyEndpoint(address1, actorA, 0) should ===(actorA)
|
||||||
|
|
||||||
reg.readOnlyEndpointFor(address1) should ===(Some(actorA))
|
reg.readOnlyEndpointFor(address1) should ===(Some((actorA, 0)))
|
||||||
reg.writableEndpointWithPolicyFor(address1) should ===(None)
|
reg.writableEndpointWithPolicyFor(address1) should be(None)
|
||||||
reg.isWritable(actorA) should ===(false)
|
reg.isWritable(actorA) should ===(false)
|
||||||
reg.isReadOnly(actorA) should ===(true)
|
reg.isReadOnly(actorA) should ===(true)
|
||||||
reg.isQuarantined(address1, 42) should ===(false)
|
reg.isQuarantined(address1, 42) should ===(false)
|
||||||
|
|
@ -48,10 +48,10 @@ class EndpointRegistrySpec extends AkkaSpec {
|
||||||
reg.readOnlyEndpointFor(address1) should ===(None)
|
reg.readOnlyEndpointFor(address1) should ===(None)
|
||||||
reg.writableEndpointWithPolicyFor(address1) should ===(None)
|
reg.writableEndpointWithPolicyFor(address1) should ===(None)
|
||||||
|
|
||||||
reg.registerReadOnlyEndpoint(address1, actorA) should ===(actorA)
|
reg.registerReadOnlyEndpoint(address1, actorA, 1) should ===(actorA)
|
||||||
reg.registerWritableEndpoint(address1, None, None, actorB) should ===(actorB)
|
reg.registerWritableEndpoint(address1, None, None, actorB) should ===(actorB)
|
||||||
|
|
||||||
reg.readOnlyEndpointFor(address1) should ===(Some(actorA))
|
reg.readOnlyEndpointFor(address1) should ===(Some((actorA, 1)))
|
||||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None, None)))
|
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None, None)))
|
||||||
|
|
||||||
reg.isWritable(actorA) should ===(false)
|
reg.isWritable(actorA) should ===(false)
|
||||||
|
|
@ -77,7 +77,7 @@ class EndpointRegistrySpec extends AkkaSpec {
|
||||||
"remove read-only endpoints if marked as failed" in {
|
"remove read-only endpoints if marked as failed" in {
|
||||||
val reg = new EndpointRegistry
|
val reg = new EndpointRegistry
|
||||||
|
|
||||||
reg.registerReadOnlyEndpoint(address1, actorA)
|
reg.registerReadOnlyEndpoint(address1, actorA, 2)
|
||||||
reg.markAsFailed(actorA, Deadline.now)
|
reg.markAsFailed(actorA, Deadline.now)
|
||||||
reg.readOnlyEndpointFor(address1) should ===(None)
|
reg.readOnlyEndpointFor(address1) should ===(None)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue