fix up multi-jvm tests
This commit is contained in:
parent
52e4a18ebe
commit
b2ce64fb6f
9 changed files with 24 additions and 23 deletions
|
|
@ -34,7 +34,7 @@ abstract class ClusterAccrualFailureDetectorSpec
|
||||||
"receive heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
"receive heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
||||||
awaitClusterUp(first, second, third)
|
awaitClusterUp(first, second, third)
|
||||||
|
|
||||||
5.seconds.dilated.sleep // let them heartbeat
|
Thread.sleep(5.seconds.dilated.toMillis) // let them heartbeat
|
||||||
cluster.failureDetector.isAvailable(first) must be(true)
|
cluster.failureDetector.isAvailable(first) must be(true)
|
||||||
cluster.failureDetector.isAvailable(second) must be(true)
|
cluster.failureDetector.isAvailable(second) must be(true)
|
||||||
cluster.failureDetector.isAvailable(third) must be(true)
|
cluster.failureDetector.isAvailable(third) must be(true)
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ abstract class ConvergenceSpec
|
||||||
log.debug("assertNotMovedUp#" + n)
|
log.debug("assertNotMovedUp#" + n)
|
||||||
assertNotMovedUp
|
assertNotMovedUp
|
||||||
// wait and then check again
|
// wait and then check again
|
||||||
1.second.dilated.sleep
|
Thread.sleep(1.second.dilated.toMillis)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeConfig
|
||||||
import akka.remote.testkit.MultiNodeSpec
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import scala.concurrent.util.duration._
|
import scala.concurrent.util.duration._
|
||||||
import akka.util.Deadline
|
import scala.concurrent.util.Deadline
|
||||||
|
|
||||||
object JoinInProgressMultiJvmSpec extends MultiNodeConfig {
|
object JoinInProgressMultiJvmSpec extends MultiNodeConfig {
|
||||||
val first = role("first")
|
val first = role("first")
|
||||||
|
|
@ -54,7 +54,7 @@ abstract class JoinInProgressSpec
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
val until = Deadline.now + 5.seconds
|
val until = Deadline.now + 5.seconds
|
||||||
while (!until.isOverdue) {
|
while (!until.isOverdue) {
|
||||||
200.millis.sleep
|
Thread.sleep(200)
|
||||||
cluster.failureDetector.isAvailable(second) must be(true)
|
cluster.failureDetector.isAvailable(second) must be(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,10 +9,10 @@ import akka.remote.testkit.MultiNodeSpec
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import scala.concurrent.util.duration._
|
import scala.concurrent.util.duration._
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.util.Deadline
|
import scala.concurrent.util.Deadline
|
||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.TimeoutException
|
||||||
import scala.collection.immutable.SortedSet
|
import scala.collection.immutable.SortedSet
|
||||||
import akka.dispatch.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.util.Duration
|
import scala.concurrent.util.Duration
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import akka.remote.testconductor.RoleName
|
import akka.remote.testconductor.RoleName
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ abstract class NodeUpSpec
|
||||||
|
|
||||||
// let it run for a while to make sure that nothing bad happens
|
// let it run for a while to make sure that nothing bad happens
|
||||||
for (n ← 1 to 20) {
|
for (n ← 1 to 20) {
|
||||||
100.millis.dilated.sleep()
|
Thread.sleep(100.millis.dilated.toMillis)
|
||||||
unexpected.get must be(SortedSet.empty)
|
unexpected.get must be(SortedSet.empty)
|
||||||
cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true)
|
cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ abstract class SunnyWeatherSpec
|
||||||
awaitUpConvergence(roles.size)
|
awaitUpConvergence(roles.size)
|
||||||
assertLeaderIn(roles)
|
assertLeaderIn(roles)
|
||||||
if (n % 5 == 0) log.info("Passed period [{}]", n)
|
if (n % 5 == 0) log.info("Passed period [{}]", n)
|
||||||
1.seconds.sleep
|
Thread.sleep(1000)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after")
|
enterBarrier("after")
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ abstract class UnreachableNodeRejoinsClusterSpec
|
||||||
// let them send at least one heartbeat to each other after the gossip convergence
|
// let them send at least one heartbeat to each other after the gossip convergence
|
||||||
// because for new joining nodes we remove them from the failure detector when
|
// because for new joining nodes we remove them from the failure detector when
|
||||||
// receive gossip
|
// receive gossip
|
||||||
2.seconds.dilated.sleep
|
Thread.sleep(2.seconds.dilated.toMillis)
|
||||||
|
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
// pull network for victim node from all nodes
|
// pull network for victim node from all nodes
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
package akka.remote.testconductor
|
package akka.remote.testconductor
|
||||||
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
|
|
||||||
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props }
|
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props }
|
||||||
import RemoteConnection.getAddrString
|
import RemoteConnection.getAddrString
|
||||||
import TestConductorProtocol._
|
import TestConductorProtocol._
|
||||||
|
|
@ -23,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.util.{ Timeout }
|
import akka.util.{ Timeout }
|
||||||
import scala.concurrent.util.{ Deadline, Duration }
|
import scala.concurrent.util.{ Deadline, Duration }
|
||||||
|
import scala.reflect.classTag
|
||||||
|
|
||||||
sealed trait Direction {
|
sealed trait Direction {
|
||||||
def includes(other: Direction): Boolean
|
def includes(other: Direction): Boolean
|
||||||
|
|
@ -97,7 +97,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def sockAddr: Future[InetSocketAddress] = {
|
def sockAddr: Future[InetSocketAddress] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? GetSockAddr mapTo
|
controller ? GetSockAddr mapTo classTag[InetSocketAddress]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -120,7 +120,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done] = {
|
def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo
|
controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -135,7 +135,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? Throttle(node, target, direction, 0f) mapTo
|
controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -148,7 +148,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? Throttle(node, target, direction, -1f) mapTo
|
controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -161,7 +161,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def disconnect(node: RoleName, target: RoleName): Future[Done] = {
|
def disconnect(node: RoleName, target: RoleName): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? Disconnect(node, target, false) mapTo
|
controller ? Disconnect(node, target, false) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -174,7 +174,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def abort(node: RoleName, target: RoleName): Future[Done] = {
|
def abort(node: RoleName, target: RoleName): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? Disconnect(node, target, true) mapTo
|
controller ? Disconnect(node, target, true) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -187,7 +187,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def shutdown(node: RoleName, exitValue: Int): Future[Done] = {
|
def shutdown(node: RoleName, exitValue: Int): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? Terminate(node, exitValue) mapTo
|
controller ? Terminate(node, exitValue) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -198,7 +198,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
// TODO: uncomment (and implement in Controller) if really needed
|
// TODO: uncomment (and implement in Controller) if really needed
|
||||||
// def kill(node: RoleName): Future[Done] = {
|
// def kill(node: RoleName): Future[Done] = {
|
||||||
// import Settings.QueryTimeout
|
// import Settings.QueryTimeout
|
||||||
// controller ? Terminate(node, -1) mapTo
|
// controller ? Terminate(node, -1) mapTo classTag[Done]
|
||||||
// }
|
// }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -206,7 +206,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def getNodes: Future[Iterable[RoleName]] = {
|
def getNodes: Future[Iterable[RoleName]] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? GetNodes mapTo
|
controller ? GetNodes mapTo classTag[Iterable[RoleName]]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -219,7 +219,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def removeNode(node: RoleName): Future[Done] = {
|
def removeNode(node: RoleName): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
controller ? Remove(node) mapTo
|
controller ? Remove(node) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -239,7 +239,7 @@ private[akka] class ConductorHandler(_createTimeout: Timeout, controller: ActorR
|
||||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||||
val channel = event.getChannel
|
val channel = event.getChannel
|
||||||
log.debug("connection from {}", getAddrString(channel))
|
log.debug("connection from {}", getAddrString(channel))
|
||||||
val fsm: ActorRef = Await.result(controller ? Controller.CreateServerFSM(channel) mapTo, Duration.Inf)
|
val fsm: ActorRef = Await.result(controller ? Controller.CreateServerFSM(channel) mapTo classTag[ActorRef], Duration.Inf)
|
||||||
clients.put(channel, fsm)
|
clients.put(channel, fsm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import scala.concurrent.Future
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
import akka.event.{ LoggingAdapter, Logging }
|
import akka.event.{ LoggingAdapter, Logging }
|
||||||
import java.net.{ InetSocketAddress, ConnectException }
|
import java.net.{ InetSocketAddress, ConnectException }
|
||||||
|
import scala.reflect.classTag
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Player is the client component of the
|
* The Player is the client component of the
|
||||||
|
|
@ -62,7 +63,7 @@ trait Player { this: TestConductorExt ⇒
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
|
||||||
a ? client mapTo
|
a ? client mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -104,7 +105,7 @@ trait Player { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def getAddressFor(name: RoleName): Future[Address] = {
|
def getAddressFor(name: RoleName): Future[Address] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
client ? ToServer(GetAddress(name)) mapTo
|
client ? ToServer(GetAddress(name)) mapTo classTag[Address]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue