Place the address cache in MultiNodeClusterSpec, see #2201
This commit is contained in:
parent
e9a96afef8
commit
f8b7189885
2 changed files with 31 additions and 18 deletions
|
|
@ -11,6 +11,9 @@ import akka.remote.testkit.MultiNodeSpec
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import akka.actor.ActorPath
|
||||||
|
import akka.actor.RootActorPath
|
||||||
|
|
||||||
object MultiNodeClusterSpec {
|
object MultiNodeClusterSpec {
|
||||||
def clusterConfig: Config = ConfigFactory.parseString("""
|
def clusterConfig: Config = ConfigFactory.parseString("""
|
||||||
|
|
@ -33,6 +36,29 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
|
||||||
|
|
||||||
override def initialParticipants = roles.size
|
override def initialParticipants = roles.size
|
||||||
|
|
||||||
|
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lookup the Address for the role.
|
||||||
|
* It is cached, which has the implication that stopping
|
||||||
|
* and then restarting a role (jvm) with another address is not
|
||||||
|
* supported.
|
||||||
|
*/
|
||||||
|
def address(role: RoleName): Address = {
|
||||||
|
cachedAddresses.get(role) match {
|
||||||
|
case null ⇒
|
||||||
|
val address = node(role).address
|
||||||
|
cachedAddresses.put(role, address)
|
||||||
|
address
|
||||||
|
case address ⇒ address
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* implicit conversion from RoleName to Address
|
||||||
|
*/
|
||||||
|
implicit def role2Address(role: RoleName): Address = address(role)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The cluster node instance. Needs to be lazily created.
|
* The cluster node instance. Needs to be lazily created.
|
||||||
*/
|
*/
|
||||||
|
|
@ -73,7 +99,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
|
||||||
}
|
}
|
||||||
testConductor.enter(roles.head.name + "-started")
|
testConductor.enter(roles.head.name + "-started")
|
||||||
if (roles.tail.contains(myself)) {
|
if (roles.tail.contains(myself)) {
|
||||||
cluster.join(node(roles.head).address)
|
cluster.join(roles.head)
|
||||||
}
|
}
|
||||||
if (upConvergence && roles.contains(myself)) {
|
if (upConvergence && roles.contains(myself)) {
|
||||||
awaitUpConvergence(numberOfMembers = roles.length)
|
awaitUpConvergence(numberOfMembers = roles.length)
|
||||||
|
|
@ -147,14 +173,11 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
|
||||||
*/
|
*/
|
||||||
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
|
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
|
||||||
import Member.addressOrdering
|
import Member.addressOrdering
|
||||||
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address)
|
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(address(x), address(y))
|
||||||
}
|
}
|
||||||
|
|
||||||
def roleName(address: Address): Option[RoleName] = {
|
def roleName(address: Address): Option[RoleName] = {
|
||||||
testConductor.getNodes.await.find(node(_).address == address)
|
testConductor.getNodes.await.find(node(_).address == address)
|
||||||
}
|
}
|
||||||
|
|
||||||
// implicit conversion from RoleName to Address
|
|
||||||
implicit def role2Address(role: RoleName): Address = node(role).address
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,14 +4,15 @@
|
||||||
package akka.remote.testkit
|
package akka.remote.testkit
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
|
|
||||||
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
|
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
|
||||||
|
|
||||||
import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem }
|
import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem }
|
||||||
import akka.dispatch.Await
|
import akka.dispatch.Await
|
||||||
import akka.dispatch.Await.Awaitable
|
import akka.dispatch.Await.Awaitable
|
||||||
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
|
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.util.{ NonFatal, Duration }
|
import akka.util.{ NonFatal, Duration }
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure the role names and participants of the test, including configuration settings.
|
* Configure the role names and participants of the test, including configuration settings.
|
||||||
|
|
@ -189,18 +190,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
|
||||||
* val serviceA = system.actorFor(node("master") / "user" / "serviceA")
|
* val serviceA = system.actorFor(node("master") / "user" / "serviceA")
|
||||||
* }}}
|
* }}}
|
||||||
*/
|
*/
|
||||||
def node(role: RoleName): ActorPath = {
|
def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await)
|
||||||
cachedRootActorPaths.get(role) match {
|
|
||||||
case null ⇒
|
|
||||||
val root = RootActorPath(testConductor.getAddressFor(role).await)
|
|
||||||
cachedRootActorPaths.put(role, root)
|
|
||||||
root
|
|
||||||
case root ⇒ root
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private val cachedRootActorPaths = new ConcurrentHashMap[RoleName, ActorPath]
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enrich `.await()` onto all Awaitables, using BarrierTimeout.
|
* Enrich `.await()` onto all Awaitables, using BarrierTimeout.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue