This commit is contained in:
parent
73d3c5db5d
commit
eb24033cc0
3 changed files with 239 additions and 5 deletions
|
|
@ -14,7 +14,7 @@ import akka.remote.testconductor.RoleName
|
|||
import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec }
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.actor.{ ActorSystem, Address }
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, PoisonPill, Props, RootActorPath }
|
||||
import akka.event.Logging.ErrorLevel
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -22,9 +22,9 @@ import scala.collection.immutable
|
|||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import akka.remote.DefaultFailureDetectorRegistry
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Actor
|
||||
import akka.actor.RootActorPath
|
||||
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberEvent, MemberExited, MemberRemoved }
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
||||
object MultiNodeClusterSpec {
|
||||
|
||||
|
|
@ -312,6 +312,46 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
|
|||
}
|
||||
}
|
||||
|
||||
def awaitMemberRemoved(toBeRemovedAddress: Address, timeout: FiniteDuration = 25.seconds): Unit = within(timeout) {
|
||||
if (toBeRemovedAddress == cluster.selfAddress) {
|
||||
enterBarrier("registered-listener")
|
||||
|
||||
cluster.leave(toBeRemovedAddress)
|
||||
enterBarrier("member-left")
|
||||
|
||||
awaitCond(cluster.isTerminated, remaining)
|
||||
enterBarrier("member-shutdown")
|
||||
} else {
|
||||
val exitingLatch = TestLatch()
|
||||
|
||||
val awaiter = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case MemberRemoved(m, _) if m.address == toBeRemovedAddress ⇒
|
||||
exitingLatch.countDown()
|
||||
case _ ⇒
|
||||
// ignore
|
||||
}
|
||||
}).withDeploy(Deploy.local))
|
||||
cluster.subscribe(awaiter, classOf[MemberEvent])
|
||||
enterBarrier("registered-listener")
|
||||
|
||||
// in the meantime member issues leave
|
||||
enterBarrier("member-left")
|
||||
|
||||
// verify that the member is EXITING
|
||||
try Await.result(exitingLatch, timeout) catch {
|
||||
case cause: Exception ⇒
|
||||
throw new AssertionError(s"Member ${toBeRemovedAddress} was not removed within ${timeout}!", cause)
|
||||
}
|
||||
awaiter ! PoisonPill // you've done your job, now die
|
||||
|
||||
enterBarrier("member-shutdown")
|
||||
markNodeAsUnavailable(toBeRemovedAddress)
|
||||
}
|
||||
|
||||
enterBarrier("member-totally-shutdown")
|
||||
}
|
||||
|
||||
def awaitAllReachable(): Unit =
|
||||
awaitAssert(clusterView.unreachableMembers should ===(Set.empty))
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue