Merge branch 'master' into wip-2109-port-cluster-test-jboner
Conflicts: akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala
This commit is contained in:
commit
c63a28f39a
31 changed files with 791 additions and 501 deletions
|
|
@ -42,27 +42,27 @@ class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVeri
|
||||||
"An Actor configured with a BalancingDispatcher" must {
|
"An Actor configured with a BalancingDispatcher" must {
|
||||||
"fail verification with a ConfigurationException if also configured with a RoundRobinRouter" in {
|
"fail verification with a ConfigurationException if also configured with a RoundRobinRouter" in {
|
||||||
intercept[ConfigurationException] {
|
intercept[ConfigurationException] {
|
||||||
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RoundRobinRouter(2)))
|
system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(2).withDispatcher("balancing-dispatcher")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"fail verification with a ConfigurationException if also configured with a BroadcastRouter" in {
|
"fail verification with a ConfigurationException if also configured with a BroadcastRouter" in {
|
||||||
intercept[ConfigurationException] {
|
intercept[ConfigurationException] {
|
||||||
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(BroadcastRouter(2)))
|
system.actorOf(Props[TestActor].withRouter(BroadcastRouter(2).withDispatcher("balancing-dispatcher")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"fail verification with a ConfigurationException if also configured with a RandomRouter" in {
|
"fail verification with a ConfigurationException if also configured with a RandomRouter" in {
|
||||||
intercept[ConfigurationException] {
|
intercept[ConfigurationException] {
|
||||||
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RandomRouter(2)))
|
system.actorOf(Props[TestActor].withRouter(RandomRouter(2).withDispatcher("balancing-dispatcher")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"fail verification with a ConfigurationException if also configured with a SmallestMailboxRouter" in {
|
"fail verification with a ConfigurationException if also configured with a SmallestMailboxRouter" in {
|
||||||
intercept[ConfigurationException] {
|
intercept[ConfigurationException] {
|
||||||
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(SmallestMailboxRouter(2)))
|
system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(2).withDispatcher("balancing-dispatcher")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"fail verification with a ConfigurationException if also configured with a ScatterGatherFirstCompletedRouter" in {
|
"fail verification with a ConfigurationException if also configured with a ScatterGatherFirstCompletedRouter" in {
|
||||||
intercept[ConfigurationException] {
|
intercept[ConfigurationException] {
|
||||||
system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 2, within = 2 seconds)))
|
system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 2, within = 2 seconds).withDispatcher("balancing-dispatcher")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"not fail verification with a ConfigurationException also not configured with a Router" in {
|
"not fail verification with a ConfigurationException also not configured with a Router" in {
|
||||||
|
|
|
||||||
|
|
@ -128,6 +128,35 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
|
||||||
current.routees.size must be(2)
|
current.routees.size must be(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"resize when busy" ignore {
|
||||||
|
|
||||||
|
val busy = new TestLatch(1)
|
||||||
|
|
||||||
|
val resizer = DefaultResizer(
|
||||||
|
lowerBound = 1,
|
||||||
|
upperBound = 3,
|
||||||
|
pressureThreshold = 0,
|
||||||
|
messagesPerResize = 1)
|
||||||
|
|
||||||
|
val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))).withDispatcher("bal-disp"))
|
||||||
|
|
||||||
|
val latch1 = new TestLatch(1)
|
||||||
|
router ! (latch1, busy)
|
||||||
|
Await.ready(latch1, 2 seconds)
|
||||||
|
|
||||||
|
val latch2 = new TestLatch(1)
|
||||||
|
router ! (latch2, busy)
|
||||||
|
Await.ready(latch2, 2 seconds)
|
||||||
|
|
||||||
|
val latch3 = new TestLatch(1)
|
||||||
|
router ! (latch3, busy)
|
||||||
|
Await.ready(latch3, 2 seconds)
|
||||||
|
|
||||||
|
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
|
||||||
|
|
||||||
|
busy.countDown()
|
||||||
|
}
|
||||||
|
|
||||||
"grow as needed under pressure" in {
|
"grow as needed under pressure" in {
|
||||||
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
// make sure the pool starts at the expected lower limit and grows to the upper as needed
|
||||||
// as influenced by the backlog of blocking pooled actors
|
// as influenced by the backlog of blocking pooled actors
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,8 @@ object Props {
|
||||||
* Props is a ActorRef configuration object, that is thread safe and fully sharable.
|
* Props is a ActorRef configuration object, that is thread safe and fully sharable.
|
||||||
* Used when creating new actors through; <code>ActorSystem.actorOf</code> and <code>ActorContext.actorOf</code>.
|
* Used when creating new actors through; <code>ActorSystem.actorOf</code> and <code>ActorContext.actorOf</code>.
|
||||||
*
|
*
|
||||||
|
* In case of providing code which creates the actual Actor instance, that must not return the same instance multiple times.
|
||||||
|
*
|
||||||
* Examples on Scala API:
|
* Examples on Scala API:
|
||||||
* {{{
|
* {{{
|
||||||
* val props = Props[MyActor]
|
* val props = Props[MyActor]
|
||||||
|
|
@ -145,6 +147,8 @@ case class Props(
|
||||||
/**
|
/**
|
||||||
* Returns a new Props with the specified creator set.
|
* Returns a new Props with the specified creator set.
|
||||||
*
|
*
|
||||||
|
* The creator must not return the same instance multiple times.
|
||||||
|
*
|
||||||
* Scala API.
|
* Scala API.
|
||||||
*/
|
*/
|
||||||
def withCreator(c: ⇒ Actor): Props = copy(creator = () ⇒ c)
|
def withCreator(c: ⇒ Actor): Props = copy(creator = () ⇒ c)
|
||||||
|
|
@ -152,6 +156,8 @@ case class Props(
|
||||||
/**
|
/**
|
||||||
* Returns a new Props with the specified creator set.
|
* Returns a new Props with the specified creator set.
|
||||||
*
|
*
|
||||||
|
* The creator must not return the same instance multiple times.
|
||||||
|
*
|
||||||
* Java API.
|
* Java API.
|
||||||
*/
|
*/
|
||||||
def withCreator(c: Creator[Actor]): Props = copy(creator = () ⇒ c.create)
|
def withCreator(c: Creator[Actor]): Props = copy(creator = () ⇒ c.create)
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,7 @@ object Await {
|
||||||
* WARNING: Blocking operation, use with caution.
|
* WARNING: Blocking operation, use with caution.
|
||||||
*
|
*
|
||||||
* @throws [[java.util.concurrent.TimeoutException]] if times out
|
* @throws [[java.util.concurrent.TimeoutException]] if times out
|
||||||
|
* @throws [[java.lang.Throwable]] (throws clause is Exception due to Java) if there was a problem
|
||||||
* @return The returned value as returned by Awaitable.result
|
* @return The returned value as returned by Awaitable.result
|
||||||
*/
|
*/
|
||||||
@throws(classOf[Exception])
|
@throws(classOf[Exception])
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,9 @@ trait Effect {
|
||||||
* A constructor/factory, takes no parameters but creates a new value of type T every call.
|
* A constructor/factory, takes no parameters but creates a new value of type T every call.
|
||||||
*/
|
*/
|
||||||
trait Creator[T] {
|
trait Creator[T] {
|
||||||
|
/**
|
||||||
|
* This method must return a different instance upon every call.
|
||||||
|
*/
|
||||||
def create(): T
|
def create(): T
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,10 +30,10 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
|
||||||
_path) {
|
_path) {
|
||||||
|
|
||||||
// verify that a BalancingDispatcher is not used with a Router
|
// verify that a BalancingDispatcher is not used with a Router
|
||||||
if (_system.dispatchers.isBalancingDispatcher(_props.dispatcher) && _props.routerConfig != NoRouter)
|
if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher))
|
||||||
throw new ConfigurationException(
|
throw new ConfigurationException(
|
||||||
"Configuration for actor [" + _path.toString +
|
"Configuration for actor [" + _path.toString +
|
||||||
"] is invalid - you can not use a 'BalancingDispatcher' together with any type of 'Router'")
|
"] is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.")
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CAUTION: RoutedActorRef is PROBLEMATIC
|
* CAUTION: RoutedActorRef is PROBLEMATIC
|
||||||
|
|
|
||||||
|
|
@ -1,90 +1,95 @@
|
||||||
///**
|
/**
|
||||||
// * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
// */
|
*/
|
||||||
//
|
|
||||||
//package akka.cluster
|
package akka.cluster
|
||||||
//
|
|
||||||
//import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
//import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
//import akka.remote.testkit.MultiNodeConfig
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
//import akka.remote.testkit.MultiNodeSpec
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
//import akka.testkit._
|
import akka.testkit._
|
||||||
//
|
|
||||||
//object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
|
object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
|
||||||
// val a1 = role("a1")
|
val a1 = role("a1")
|
||||||
// val a2 = role("a2")
|
val a2 = role("a2")
|
||||||
// val b1 = role("b1")
|
val b1 = role("b1")
|
||||||
// val b2 = role("b2")
|
val b2 = role("b2")
|
||||||
// val c1 = role("c1")
|
val c1 = role("c1")
|
||||||
// val c2 = role("c2")
|
val c2 = role("c2")
|
||||||
//
|
|
||||||
// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
//
|
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec
|
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec
|
||||||
//class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec
|
class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec
|
||||||
//class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec
|
class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec
|
||||||
//class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
|
class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
|
||||||
//class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
|
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
|
||||||
//class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
|
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
|
||||||
//
|
|
||||||
//abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||||
// import JoinTwoClustersMultiJvmSpec._
|
import JoinTwoClustersMultiJvmSpec._
|
||||||
//
|
|
||||||
// override def initialParticipants = 6
|
override def initialParticipants = 6
|
||||||
//
|
|
||||||
// after {
|
after {
|
||||||
// testConductor.enter("after")
|
testConductor.enter("after")
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// val a1Address = node(a1).address
|
lazy val a1Address = node(a1).address
|
||||||
// val b1Address = node(b1).address
|
lazy val b1Address = node(b1).address
|
||||||
// val c1Address = node(c1).address
|
lazy val c1Address = node(c1).address
|
||||||
//
|
|
||||||
// "Three different clusters (A, B and C)" must {
|
"Three different clusters (A, B and C)" must {
|
||||||
//
|
|
||||||
// "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
|
"be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
|
||||||
//
|
// make sure that the node-to-join is started before other join
|
||||||
// runOn(a1, a2) {
|
runOn(a1, b1, c1) {
|
||||||
// cluster.join(a1Address)
|
cluster
|
||||||
// }
|
}
|
||||||
// runOn(b1, b2) {
|
testConductor.enter("first-started")
|
||||||
// cluster.join(b1Address)
|
|
||||||
// }
|
runOn(a1, a2) {
|
||||||
// runOn(c1, c2) {
|
cluster.join(a1Address)
|
||||||
// cluster.join(c1Address)
|
}
|
||||||
// }
|
runOn(b1, b2) {
|
||||||
//
|
cluster.join(b1Address)
|
||||||
// awaitUpConvergence(numberOfMembers = 2)
|
}
|
||||||
//
|
runOn(c1, c2) {
|
||||||
// assertLeader(a1, a2)
|
cluster.join(c1Address)
|
||||||
// assertLeader(b1, b2)
|
}
|
||||||
// assertLeader(c1, c2)
|
|
||||||
//
|
awaitUpConvergence(numberOfMembers = 2)
|
||||||
// runOn(b2) {
|
|
||||||
// cluster.join(a1Address)
|
assertLeader(a1, a2)
|
||||||
// }
|
assertLeader(b1, b2)
|
||||||
//
|
assertLeader(c1, c2)
|
||||||
// runOn(a1, a2, b1, b2) {
|
|
||||||
// awaitUpConvergence(numberOfMembers = 4)
|
runOn(b2) {
|
||||||
// }
|
cluster.join(a1Address)
|
||||||
//
|
}
|
||||||
// assertLeader(a1, a2, b1, b2)
|
|
||||||
// assertLeader(c1, c2)
|
runOn(a1, a2, b1, b2) {
|
||||||
//
|
awaitUpConvergence(numberOfMembers = 4)
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in {
|
assertLeader(a1, a2, b1, b2)
|
||||||
//
|
assertLeader(c1, c2)
|
||||||
// runOn(b2) {
|
|
||||||
// cluster.join(c1Address)
|
}
|
||||||
// }
|
|
||||||
//
|
"be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in {
|
||||||
// awaitUpConvergence(numberOfMembers = 6)
|
|
||||||
//
|
runOn(b2) {
|
||||||
// assertLeader(a1, a2, b1, b2, c1, c2)
|
cluster.join(c1Address)
|
||||||
// }
|
}
|
||||||
// }
|
|
||||||
//
|
awaitUpConvergence(numberOfMembers = 6)
|
||||||
//}
|
|
||||||
|
assertLeader(a1, a2, b1, b2, c1, c2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,108 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.testkit._
|
||||||
|
|
||||||
|
object LeaderElectionMultiJvmSpec extends MultiNodeConfig {
|
||||||
|
val controller = role("controller")
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
val third = role("third")
|
||||||
|
val fourth = role("fourth")
|
||||||
|
|
||||||
|
commonConfig(debugConfig(on = false).
|
||||||
|
withFallback(ConfigFactory.parseString("""
|
||||||
|
akka.cluster.auto-down = off
|
||||||
|
""")).
|
||||||
|
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec
|
||||||
|
class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec
|
||||||
|
class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec
|
||||||
|
class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec
|
||||||
|
class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec
|
||||||
|
|
||||||
|
abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec {
|
||||||
|
import LeaderElectionMultiJvmSpec._
|
||||||
|
|
||||||
|
override def initialParticipants = 5
|
||||||
|
|
||||||
|
lazy val firstAddress = node(first).address
|
||||||
|
|
||||||
|
// sorted in the order used by the cluster
|
||||||
|
lazy val roles = Seq(first, second, third, fourth).sorted
|
||||||
|
|
||||||
|
"A cluster of four nodes" must {
|
||||||
|
|
||||||
|
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
|
||||||
|
// make sure that the node-to-join is started before other join
|
||||||
|
runOn(first) {
|
||||||
|
cluster
|
||||||
|
}
|
||||||
|
testConductor.enter("first-started")
|
||||||
|
|
||||||
|
if (mySelf != controller) {
|
||||||
|
cluster.join(firstAddress)
|
||||||
|
awaitUpConvergence(numberOfMembers = roles.size)
|
||||||
|
cluster.isLeader must be(mySelf == roles.head)
|
||||||
|
}
|
||||||
|
testConductor.enter("after")
|
||||||
|
}
|
||||||
|
|
||||||
|
def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = {
|
||||||
|
val currentRoles = roles.drop(alreadyShutdown)
|
||||||
|
currentRoles.size must be >= (2)
|
||||||
|
val leader = currentRoles.head
|
||||||
|
val aUser = currentRoles.last
|
||||||
|
|
||||||
|
mySelf match {
|
||||||
|
|
||||||
|
case `controller` ⇒
|
||||||
|
testConductor.enter("before-shutdown")
|
||||||
|
testConductor.shutdown(leader, 0)
|
||||||
|
testConductor.removeNode(leader)
|
||||||
|
testConductor.enter("after-shutdown", "after-down", "completed")
|
||||||
|
|
||||||
|
case `leader` ⇒
|
||||||
|
testConductor.enter("before-shutdown")
|
||||||
|
// this node will be shutdown by the controller and doesn't participate in more barriers
|
||||||
|
|
||||||
|
case `aUser` ⇒
|
||||||
|
val leaderAddress = node(leader).address
|
||||||
|
testConductor.enter("before-shutdown", "after-shutdown")
|
||||||
|
// user marks the shutdown leader as DOWN
|
||||||
|
cluster.down(leaderAddress)
|
||||||
|
testConductor.enter("after-down", "completed")
|
||||||
|
|
||||||
|
case _ if currentRoles.tail.contains(mySelf) ⇒
|
||||||
|
// remaining cluster nodes, not shutdown
|
||||||
|
testConductor.enter("before-shutdown", "after-shutdown", "after-down")
|
||||||
|
|
||||||
|
awaitUpConvergence(currentRoles.size - 1)
|
||||||
|
val nextExpectedLeader = currentRoles.tail.head
|
||||||
|
cluster.isLeader must be(mySelf == nextExpectedLeader)
|
||||||
|
|
||||||
|
testConductor.enter("completed")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
|
||||||
|
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in {
|
||||||
|
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,77 +1,83 @@
|
||||||
///**
|
/**
|
||||||
// * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
// */
|
*/
|
||||||
//package akka.cluster
|
package akka.cluster
|
||||||
//
|
|
||||||
//import scala.collection.immutable.SortedSet
|
import scala.collection.immutable.SortedSet
|
||||||
//import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
//import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
//import akka.remote.testkit.MultiNodeConfig
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
//import akka.remote.testkit.MultiNodeSpec
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
//import akka.testkit._
|
import akka.testkit._
|
||||||
//
|
|
||||||
//object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
|
object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
|
||||||
// val first = role("first")
|
val first = role("first")
|
||||||
// val second = role("second")
|
val second = role("second")
|
||||||
// val third = role("third")
|
val third = role("third")
|
||||||
//
|
|
||||||
// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
//
|
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
|
class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
|
||||||
//class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec
|
class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec
|
||||||
//class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec
|
class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec
|
||||||
//
|
|
||||||
//abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec)
|
abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec)
|
||||||
// with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||||
// import MembershipChangeListenerMultiJvmSpec._
|
import MembershipChangeListenerMultiJvmSpec._
|
||||||
//
|
|
||||||
// override def initialParticipants = 3
|
override def initialParticipants = 3
|
||||||
//
|
|
||||||
// after {
|
after {
|
||||||
// testConductor.enter("after")
|
testConductor.enter("after")
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// "A set of connected cluster systems" must {
|
lazy val firstAddress = node(first).address
|
||||||
//
|
lazy val secondAddress = node(second).address
|
||||||
// val firstAddress = node(first).address
|
|
||||||
// val secondAddress = node(second).address
|
"A set of connected cluster systems" must {
|
||||||
//
|
|
||||||
// "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
"(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
||||||
//
|
|
||||||
// runOn(first, second) {
|
// make sure that the node-to-join is started before other join
|
||||||
// cluster.join(firstAddress)
|
runOn(first) {
|
||||||
// val latch = TestLatch()
|
cluster
|
||||||
// cluster.registerListener(new MembershipChangeListener {
|
}
|
||||||
// def notify(members: SortedSet[Member]) {
|
testConductor.enter("first-started")
|
||||||
// if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
|
|
||||||
// latch.countDown()
|
runOn(first, second) {
|
||||||
// }
|
cluster.join(firstAddress)
|
||||||
// })
|
val latch = TestLatch()
|
||||||
// latch.await
|
cluster.registerListener(new MembershipChangeListener {
|
||||||
// cluster.convergence.isDefined must be(true)
|
def notify(members: SortedSet[Member]) {
|
||||||
// }
|
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
|
||||||
//
|
latch.countDown()
|
||||||
// }
|
}
|
||||||
//
|
})
|
||||||
// "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
latch.await
|
||||||
//
|
cluster.convergence.isDefined must be(true)
|
||||||
// runOn(third) {
|
}
|
||||||
// cluster.join(firstAddress)
|
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// val latch = TestLatch()
|
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
||||||
// cluster.registerListener(new MembershipChangeListener {
|
|
||||||
// def notify(members: SortedSet[Member]) {
|
runOn(third) {
|
||||||
// if (members.size == 3 && members.forall(_.status == MemberStatus.Up))
|
cluster.join(firstAddress)
|
||||||
// latch.countDown()
|
}
|
||||||
// }
|
|
||||||
// })
|
val latch = TestLatch()
|
||||||
// latch.await
|
cluster.registerListener(new MembershipChangeListener {
|
||||||
// cluster.convergence.isDefined must be(true)
|
def notify(members: SortedSet[Member]) {
|
||||||
//
|
if (members.size == 3 && members.forall(_.status == MemberStatus.Up))
|
||||||
// }
|
latch.countDown()
|
||||||
// }
|
}
|
||||||
//
|
})
|
||||||
//}
|
latch.await
|
||||||
|
cluster.convergence.isDefined must be(true)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,8 +45,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
|
||||||
*/
|
*/
|
||||||
def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) {
|
def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) {
|
||||||
nodesInCluster.length must not be (0)
|
nodesInCluster.length must not be (0)
|
||||||
import Member.addressOrdering
|
val expectedLeader = roleOfLeader(nodesInCluster)
|
||||||
val expectedLeader = nodesInCluster.map(role ⇒ (role, node(role).address)).sortBy(_._2).head._1
|
|
||||||
cluster.isLeader must be(ifNode(expectedLeader)(true)(false))
|
cluster.isLeader must be(ifNode(expectedLeader)(true)(false))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -66,6 +65,23 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
|
||||||
def awaitUpConvergence(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = {
|
def awaitUpConvergence(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = {
|
||||||
awaitCond(cluster.latestGossip.members.size == nrOfMembers)
|
awaitCond(cluster.latestGossip.members.size == nrOfMembers)
|
||||||
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
|
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
|
||||||
awaitCond(canNotBePartOfRing forall (address => !(cluster.latestGossip.members exists (_.address.port == address.port))))
|
awaitCond(canNotBePartOfRing forall (address ⇒ !(cluster.latestGossip.members exists (_.address == address))))
|
||||||
|
}
|
||||||
|
|
||||||
|
def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = {
|
||||||
|
nodesInCluster.length must not be (0)
|
||||||
|
nodesInCluster.sorted.head
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sort the roles in the order used by the cluster.
|
||||||
|
*/
|
||||||
|
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
|
||||||
|
import Member.addressOrdering
|
||||||
|
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address)
|
||||||
|
}
|
||||||
|
|
||||||
|
def roleName(address: Address): Option[RoleName] = {
|
||||||
|
testConductor.getNodes.await.find(node(_).address == address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,70 +1,76 @@
|
||||||
///**
|
/**
|
||||||
// * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
// */
|
*/
|
||||||
//package akka.cluster
|
package akka.cluster
|
||||||
//
|
|
||||||
//import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
//import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
//import akka.remote.testkit.MultiNodeConfig
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
//import akka.remote.testkit.MultiNodeSpec
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
//import akka.testkit._
|
import akka.testkit._
|
||||||
//
|
|
||||||
//object NodeMembershipMultiJvmSpec extends MultiNodeConfig {
|
object NodeMembershipMultiJvmSpec extends MultiNodeConfig {
|
||||||
// val first = role("first")
|
val first = role("first")
|
||||||
// val second = role("second")
|
val second = role("second")
|
||||||
// val third = role("third")
|
val third = role("third")
|
||||||
//
|
|
||||||
// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
//
|
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec
|
class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec
|
||||||
//class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec
|
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec
|
||||||
//class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec
|
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec
|
||||||
//
|
|
||||||
//abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||||
// import NodeMembershipMultiJvmSpec._
|
import NodeMembershipMultiJvmSpec._
|
||||||
//
|
|
||||||
// override def initialParticipants = 3
|
override def initialParticipants = 3
|
||||||
//
|
|
||||||
// after {
|
after {
|
||||||
// testConductor.enter("after")
|
testConductor.enter("after")
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// val firstAddress = node(first).address
|
lazy val firstAddress = node(first).address
|
||||||
// val secondAddress = node(second).address
|
lazy val secondAddress = node(second).address
|
||||||
// val thirdAddress = node(third).address
|
lazy val thirdAddress = node(third).address
|
||||||
//
|
|
||||||
// "A set of connected cluster systems" must {
|
"A set of connected cluster systems" must {
|
||||||
//
|
|
||||||
// "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
|
"(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
|
||||||
//
|
|
||||||
// runOn(first, second) {
|
// make sure that the node-to-join is started before other join
|
||||||
// cluster.join(firstAddress)
|
runOn(first) {
|
||||||
// awaitCond(cluster.latestGossip.members.size == 2)
|
cluster
|
||||||
// assertMembers(cluster.latestGossip.members, firstAddress, secondAddress)
|
}
|
||||||
// awaitCond {
|
testConductor.enter("first-started")
|
||||||
// cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
|
|
||||||
// }
|
runOn(first, second) {
|
||||||
// awaitCond(cluster.convergence.isDefined)
|
cluster.join(firstAddress)
|
||||||
// }
|
awaitCond(cluster.latestGossip.members.size == 2)
|
||||||
//
|
assertMembers(cluster.latestGossip.members, firstAddress, secondAddress)
|
||||||
// }
|
awaitCond {
|
||||||
//
|
cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
|
||||||
// "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
|
}
|
||||||
//
|
awaitCond(cluster.convergence.isDefined)
|
||||||
// runOn(third) {
|
}
|
||||||
// cluster.join(firstAddress)
|
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// awaitCond(cluster.latestGossip.members.size == 3)
|
"(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
|
||||||
// assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress)
|
|
||||||
// awaitCond {
|
runOn(third) {
|
||||||
// cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
|
cluster.join(firstAddress)
|
||||||
// }
|
}
|
||||||
// awaitCond(cluster.convergence.isDefined)
|
|
||||||
//
|
awaitCond(cluster.latestGossip.members.size == 3)
|
||||||
// }
|
assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress)
|
||||||
// }
|
awaitCond {
|
||||||
//
|
cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
|
||||||
//}
|
}
|
||||||
|
awaitCond(cluster.convergence.isDefined)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,74 +1,74 @@
|
||||||
///**
|
/**
|
||||||
// * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
// */
|
*/
|
||||||
//package akka.cluster
|
package akka.cluster
|
||||||
//
|
|
||||||
//import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
//import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
//import akka.remote.testkit.MultiNodeConfig
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
//import akka.remote.testkit.MultiNodeSpec
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
//import akka.testkit._
|
import akka.testkit._
|
||||||
//
|
|
||||||
//object NodeStartupMultiJvmSpec extends MultiNodeConfig {
|
object NodeStartupMultiJvmSpec extends MultiNodeConfig {
|
||||||
// val first = role("first")
|
val first = role("first")
|
||||||
// val second = role("second")
|
val second = role("second")
|
||||||
//
|
|
||||||
// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
//
|
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//class NodeStartupMultiJvmNode1 extends NodeStartupSpec
|
class NodeStartupMultiJvmNode1 extends NodeStartupSpec
|
||||||
//class NodeStartupMultiJvmNode2 extends NodeStartupSpec
|
class NodeStartupMultiJvmNode2 extends NodeStartupSpec
|
||||||
//
|
|
||||||
//abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||||
// import NodeStartupMultiJvmSpec._
|
import NodeStartupMultiJvmSpec._
|
||||||
//
|
|
||||||
// override def initialParticipants = 2
|
override def initialParticipants = 2
|
||||||
//
|
|
||||||
// after {
|
after {
|
||||||
// testConductor.enter("after")
|
testConductor.enter("after")
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// val firstAddress = node(first).address
|
lazy val firstAddress = node(first).address
|
||||||
// val secondAddress = node(second).address
|
lazy val secondAddress = node(second).address
|
||||||
//
|
|
||||||
// "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
|
"A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
|
||||||
//
|
|
||||||
// "be a singleton cluster when started up" taggedAs LongRunningTest in {
|
"be a singleton cluster when started up" taggedAs LongRunningTest in {
|
||||||
// runOn(first) {
|
runOn(first) {
|
||||||
// awaitCond(cluster.isSingletonCluster)
|
awaitCond(cluster.isSingletonCluster)
|
||||||
// // FIXME #2117 singletonCluster should reach convergence
|
// FIXME #2117 singletonCluster should reach convergence
|
||||||
// //awaitCond(cluster.convergence.isDefined)
|
//awaitCond(cluster.convergence.isDefined)
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// "be in 'Joining' phase when started up" taggedAs LongRunningTest in {
|
"be in 'Joining' phase when started up" taggedAs LongRunningTest in {
|
||||||
// runOn(first) {
|
runOn(first) {
|
||||||
// val members = cluster.latestGossip.members
|
val members = cluster.latestGossip.members
|
||||||
// members.size must be(1)
|
members.size must be(1)
|
||||||
//
|
|
||||||
// val joiningMember = members find (_.address == firstAddress)
|
val joiningMember = members find (_.address == firstAddress)
|
||||||
// joiningMember must not be (None)
|
joiningMember must not be (None)
|
||||||
// joiningMember.get.status must be(MemberStatus.Joining)
|
joiningMember.get.status must be(MemberStatus.Joining)
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// "A second cluster node" must {
|
"A second cluster node" must {
|
||||||
// "join the other node cluster when sending a Join command" taggedAs LongRunningTest in {
|
"join the other node cluster when sending a Join command" taggedAs LongRunningTest in {
|
||||||
//
|
|
||||||
// runOn(second) {
|
runOn(second) {
|
||||||
// cluster.join(firstAddress)
|
cluster.join(firstAddress)
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// awaitCond {
|
awaitCond {
|
||||||
// cluster.latestGossip.members.exists { member ⇒
|
cluster.latestGossip.members.exists { member ⇒
|
||||||
// member.address == secondAddress && member.status == MemberStatus.Up
|
member.address == secondAddress && member.status == MemberStatus.Up
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// cluster.latestGossip.members.size must be(2)
|
cluster.latestGossip.members.size must be(2)
|
||||||
// awaitCond(cluster.convergence.isDefined)
|
awaitCond(cluster.convergence.isDefined)
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
//}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,131 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package akka.cluster
|
|
||||||
|
|
||||||
import akka.testkit._
|
|
||||||
import akka.dispatch._
|
|
||||||
import akka.actor._
|
|
||||||
import akka.remote._
|
|
||||||
import akka.util.duration._
|
|
||||||
|
|
||||||
import com.typesafe.config._
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
|
||||||
|
|
||||||
class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
|
|
||||||
val portPrefix = 5
|
|
||||||
|
|
||||||
var node1: Cluster = _
|
|
||||||
var node2: Cluster = _
|
|
||||||
var node3: Cluster = _
|
|
||||||
|
|
||||||
var system1: ActorSystemImpl = _
|
|
||||||
var system2: ActorSystemImpl = _
|
|
||||||
var system3: ActorSystemImpl = _
|
|
||||||
|
|
||||||
try {
|
|
||||||
"A cluster of three nodes" must {
|
|
||||||
|
|
||||||
// ======= NODE 1 ========
|
|
||||||
system1 = ActorSystem("system1", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d550
|
|
||||||
}""".format(portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node1 = Cluster(system1)
|
|
||||||
val address1 = node1.remoteAddress
|
|
||||||
|
|
||||||
// ======= NODE 2 ========
|
|
||||||
system2 = ActorSystem("system2", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d551
|
|
||||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
|
||||||
}""".format(portPrefix, portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node2 = Cluster(system2)
|
|
||||||
val address2 = node2.remoteAddress
|
|
||||||
|
|
||||||
// ======= NODE 3 ========
|
|
||||||
system3 = ActorSystem("system3", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d552
|
|
||||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
|
||||||
}""".format(portPrefix, portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node3 = Cluster(system3)
|
|
||||||
val address3 = node3.remoteAddress
|
|
||||||
|
|
||||||
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
|
|
||||||
|
|
||||||
println("Give the system time to converge...")
|
|
||||||
awaitConvergence(node1 :: node2 :: node3 :: Nil)
|
|
||||||
|
|
||||||
// check leader
|
|
||||||
node1.isLeader must be(true)
|
|
||||||
node2.isLeader must be(false)
|
|
||||||
node3.isLeader must be(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
|
|
||||||
|
|
||||||
// shut down system1 - the leader
|
|
||||||
node1.shutdown()
|
|
||||||
system1.shutdown()
|
|
||||||
|
|
||||||
// user marks node1 as DOWN
|
|
||||||
node2.down(address1)
|
|
||||||
|
|
||||||
println("Give the system time to converge...")
|
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
|
||||||
awaitConvergence(node2 :: node3 :: Nil)
|
|
||||||
|
|
||||||
// check leader
|
|
||||||
node2.isLeader must be(true)
|
|
||||||
node3.isLeader must be(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
"be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in {
|
|
||||||
|
|
||||||
// shut down system1 - the leader
|
|
||||||
node2.shutdown()
|
|
||||||
system2.shutdown()
|
|
||||||
|
|
||||||
// user marks node2 as DOWN
|
|
||||||
node3.down(address2)
|
|
||||||
|
|
||||||
println("Give the system time to converge...")
|
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
|
||||||
awaitConvergence(node3 :: Nil)
|
|
||||||
|
|
||||||
// check leader
|
|
||||||
node3.isLeader must be(true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case e: Exception ⇒
|
|
||||||
e.printStackTrace
|
|
||||||
fail(e.toString)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def atTermination() {
|
|
||||||
if (node1 ne null) node1.shutdown()
|
|
||||||
if (system1 ne null) system1.shutdown()
|
|
||||||
|
|
||||||
if (node2 ne null) node2.shutdown()
|
|
||||||
if (system2 ne null) system2.shutdown()
|
|
||||||
|
|
||||||
if (node3 ne null) node3.shutdown()
|
|
||||||
if (system3 ne null) system3.shutdown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -6,3 +6,4 @@ Additional Information
|
||||||
|
|
||||||
recipes
|
recipes
|
||||||
language-bindings
|
language-bindings
|
||||||
|
osgi
|
||||||
|
|
|
||||||
10
akka-docs/additional/osgi.rst
Normal file
10
akka-docs/additional/osgi.rst
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
Akka in OSGi
|
||||||
|
============
|
||||||
|
|
||||||
|
Configuring the OSGi Framework
|
||||||
|
------------------------------
|
||||||
|
|
||||||
|
To use Akka in an OSGi environment, the ``org.osgi.framework.bootdelegation``
|
||||||
|
property must be set to always delegate the ``sun.misc`` package to the boot classloader
|
||||||
|
instead of resolving it through the normal OSGi class space.
|
||||||
|
|
||||||
|
|
@ -55,6 +55,8 @@ actors cannot be orphaned or attached to supervisors from the outside, which
|
||||||
might otherwise catch them unawares. In addition, this yields a natural and
|
might otherwise catch them unawares. In addition, this yields a natural and
|
||||||
clean shutdown procedure for (sub-trees of) actor applications.
|
clean shutdown procedure for (sub-trees of) actor applications.
|
||||||
|
|
||||||
|
.. _supervision-restart:
|
||||||
|
|
||||||
What Restarting Means
|
What Restarting Means
|
||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
|
|
|
||||||
18
akka-docs/java/code/docs/pattern/JavaTemplate.java
Normal file
18
akka-docs/java/code/docs/pattern/JavaTemplate.java
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.pattern;
|
||||||
|
|
||||||
|
// this part will not appear in the docs
|
||||||
|
|
||||||
|
//#all-of-it
|
||||||
|
class JavaTemplate {
|
||||||
|
public JavaTemplate() {
|
||||||
|
System.out.println("Hello, Template!");
|
||||||
|
}
|
||||||
|
//#uninteresting-stuff
|
||||||
|
// don’t show this plumbimg
|
||||||
|
//#uninteresting-stuff
|
||||||
|
}
|
||||||
|
//#all-of-it
|
||||||
|
|
@ -72,6 +72,8 @@ There are 4 different types of message dispatchers:
|
||||||
|
|
||||||
- This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.
|
- This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.
|
||||||
|
|
||||||
|
- All the actors share a single Mailbox that they get their messages from.
|
||||||
|
|
||||||
- It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
|
- It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
|
||||||
|
|
||||||
- Sharability: Actors of the same type only
|
- Sharability: Actors of the same type only
|
||||||
|
|
@ -85,7 +87,7 @@ There are 4 different types of message dispatchers:
|
||||||
"thread-pool-executor" or the FQCN of
|
"thread-pool-executor" or the FQCN of
|
||||||
an ``akka.dispatcher.ExecutorServiceConfigurator``
|
an ``akka.dispatcher.ExecutorServiceConfigurator``
|
||||||
|
|
||||||
- Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification.
|
- Note that you can **not** use a ``BalancingDispatcher`` as a **Router Dispatcher**. (You can however use it for the **Routees**)
|
||||||
|
|
||||||
* CallingThreadDispatcher
|
* CallingThreadDispatcher
|
||||||
|
|
||||||
|
|
|
||||||
33
akka-docs/java/howto.rst
Normal file
33
akka-docs/java/howto.rst
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
|
||||||
|
.. _howto-java:
|
||||||
|
|
||||||
|
######################
|
||||||
|
HowTo: Common Patterns
|
||||||
|
######################
|
||||||
|
|
||||||
|
This section lists common actor patterns which have been found to be useful,
|
||||||
|
elegant or instructive. Anything is welcome, example topics being message
|
||||||
|
routing strategies, supervision patterns, restart handling, etc. As a special
|
||||||
|
bonus, additions to this section are marked with the contributor’s name, and it
|
||||||
|
would be nice if every Akka user who finds a recurring pattern in his or her
|
||||||
|
code could share it for the profit of all. Where applicable it might also make
|
||||||
|
sense to add to the ``akka.pattern`` package for creating an `OTP-like library
|
||||||
|
<http://www.erlang.org/doc/man_index.html>`_.
|
||||||
|
|
||||||
|
Template Pattern
|
||||||
|
================
|
||||||
|
|
||||||
|
*Contributed by: N. N.*
|
||||||
|
|
||||||
|
This is an especially nice pattern, since it does even come with some empty example code:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/pattern/JavaTemplate.java
|
||||||
|
:include: all-of-it
|
||||||
|
:exclude: uninteresting-stuff
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
Spread the word: this is the easiest way to get famous!
|
||||||
|
|
||||||
|
Please keep this pattern at the end of this file.
|
||||||
|
|
||||||
|
|
@ -24,3 +24,4 @@ Java API
|
||||||
extending-akka
|
extending-akka
|
||||||
zeromq
|
zeromq
|
||||||
microkernel
|
microkernel
|
||||||
|
howto
|
||||||
|
|
|
||||||
|
|
@ -380,11 +380,16 @@ The dispatcher for created children of the router will be taken from
|
||||||
makes sense to configure the :class:`BalancingDispatcher` if the precise
|
makes sense to configure the :class:`BalancingDispatcher` if the precise
|
||||||
routing is not so important (i.e. no consistent hashing or round-robin is
|
routing is not so important (i.e. no consistent hashing or round-robin is
|
||||||
required); this enables newly created routees to pick up work immediately by
|
required); this enables newly created routees to pick up work immediately by
|
||||||
stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher``
|
stealing it from their siblings.
|
||||||
together with any kind of ``Router``, trying to do so will make your actor fail verification.
|
|
||||||
|
|
||||||
The “head” router, of course, cannot run on the same balancing dispatcher,
|
.. note::
|
||||||
because it does not process the same messages, hence this special actor does
|
|
||||||
|
If you provide a collection of actors to route to, then they will still use the same dispatcher
|
||||||
|
that was configured for them in their ``Props``, it is not possible to change an actors dispatcher
|
||||||
|
after it has been created.
|
||||||
|
|
||||||
|
The “head” router cannot always run on the same dispatcher, because it
|
||||||
|
does not process the same type of messages, hence this special actor does
|
||||||
not use the dispatcher configured in :class:`Props`, but takes the
|
not use the dispatcher configured in :class:`Props`, but takes the
|
||||||
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
|
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
|
||||||
the actor system’s default dispatcher. All standard routers allow setting this
|
the actor system’s default dispatcher. All standard routers allow setting this
|
||||||
|
|
@ -393,3 +398,31 @@ implement the method in a suitable way.
|
||||||
|
|
||||||
.. includecode:: code/docs/jrouting/CustomRouterDocTestBase.java#dispatchers
|
.. includecode:: code/docs/jrouting/CustomRouterDocTestBase.java#dispatchers
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
It is not allowed to configure the ``routerDispatcher`` to be a
|
||||||
|
:class:`BalancingDispatcher` since the messages meant for the special
|
||||||
|
router actor cannot be processed by any other actor.
|
||||||
|
|
||||||
|
At first glance there seems to be an overlap between the
|
||||||
|
:class:`BalancingDispatcher` and Routers, but they complement each other.
|
||||||
|
The balancing dispatcher is in charge of running the actors while the routers
|
||||||
|
are in charge of deciding which message goes where. A router can also have
|
||||||
|
children that span multiple actor systems, even remote ones, but a dispatcher
|
||||||
|
lives inside a single actor system.
|
||||||
|
|
||||||
|
When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher`
|
||||||
|
there are some configuration settings to take into account.
|
||||||
|
|
||||||
|
- There can only be ``nr-of-instances`` messages being processed at the same
|
||||||
|
time no matter how many threads are configured for the
|
||||||
|
:class:`BalancingDispatcher`.
|
||||||
|
|
||||||
|
- Having ``throughput`` set to a low number makes no sense since you will only
|
||||||
|
be handing off to another actor that processes the same :class:`MailBox`
|
||||||
|
as yourself, which can be costly. Either the message just got into the
|
||||||
|
mailbox and you can receive it as well as anybody else, or everybody else
|
||||||
|
is busy and you are the only one available to receive the message.
|
||||||
|
|
||||||
|
- Resizing the number of routees only introduce inertia, since resizing
|
||||||
|
is performed at specified intervals, but work stealing is instantaneous.
|
||||||
|
|
|
||||||
|
|
@ -115,6 +115,12 @@ Here is an example:
|
||||||
This way of creating the Actor is also great for integrating with Dependency Injection
|
This way of creating the Actor is also great for integrating with Dependency Injection
|
||||||
(DI) frameworks like Guice or Spring.
|
(DI) frameworks like Guice or Spring.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
You might be tempted at times to offer an ``UntypedActor`` factory which
|
||||||
|
always returns the same instance, e.g. by using a static field. This is not
|
||||||
|
supported, as it goes against the meaning of an actor restart, which is
|
||||||
|
described here: :ref:`supervision-restart`.
|
||||||
|
|
||||||
UntypedActor API
|
UntypedActor API
|
||||||
================
|
================
|
||||||
|
|
|
||||||
|
|
@ -105,6 +105,13 @@ Here is an example:
|
||||||
|
|
||||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#creating-constructor
|
.. includecode:: code/docs/actor/ActorDocSpec.scala#creating-constructor
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
You might be tempted at times to offer an ``Actor`` factory which always
|
||||||
|
returns the same instance, e.g. by using a ``lazy val`` or an
|
||||||
|
``object ... extends Actor``. This is not supported, as it goes against the
|
||||||
|
meaning of an actor restart, which is described here:
|
||||||
|
:ref:`supervision-restart`.
|
||||||
|
|
||||||
Props
|
Props
|
||||||
-----
|
-----
|
||||||
|
|
|
||||||
16
akka-docs/scala/code/docs/pattern/ScalaTemplate.scala
Normal file
16
akka-docs/scala/code/docs/pattern/ScalaTemplate.scala
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.pattern
|
||||||
|
|
||||||
|
// this part will not appear in the docs
|
||||||
|
|
||||||
|
//#all-of-it
|
||||||
|
class ScalaTemplate {
|
||||||
|
println("Hello, Template!")
|
||||||
|
//#uninteresting-stuff
|
||||||
|
// don’t show this plumbimg
|
||||||
|
//#uninteresting-stuff
|
||||||
|
}
|
||||||
|
//#all-of-it
|
||||||
|
|
@ -73,6 +73,8 @@ There are 4 different types of message dispatchers:
|
||||||
|
|
||||||
- This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.
|
- This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.
|
||||||
|
|
||||||
|
- All the actors share a single Mailbox that they get their messages from.
|
||||||
|
|
||||||
- It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
|
- It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
|
||||||
|
|
||||||
- Sharability: Actors of the same type only
|
- Sharability: Actors of the same type only
|
||||||
|
|
@ -86,7 +88,7 @@ There are 4 different types of message dispatchers:
|
||||||
"thread-pool-executor" or the FQCN of
|
"thread-pool-executor" or the FQCN of
|
||||||
an ``akka.dispatcher.ExecutorServiceConfigurator``
|
an ``akka.dispatcher.ExecutorServiceConfigurator``
|
||||||
|
|
||||||
- Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification.
|
- Note that you can **not** use a ``BalancingDispatcher`` as a **Router Dispatcher**. (You can however use it for the **Routees**)
|
||||||
|
|
||||||
* CallingThreadDispatcher
|
* CallingThreadDispatcher
|
||||||
|
|
||||||
|
|
@ -114,7 +116,7 @@ And then using it:
|
||||||
|
|
||||||
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher
|
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher
|
||||||
|
|
||||||
Note that ``thread-pool-executor`` configuration as per the above ``my-thread-pool-dispatcher`` exmaple is
|
Note that ``thread-pool-executor`` configuration as per the above ``my-thread-pool-dispatcher`` example is
|
||||||
NOT applicable. This is because every actor will have its own thread pool when using ``PinnedDispatcher``,
|
NOT applicable. This is because every actor will have its own thread pool when using ``PinnedDispatcher``,
|
||||||
and that pool will have only one thread.
|
and that pool will have only one thread.
|
||||||
|
|
||||||
|
|
|
||||||
33
akka-docs/scala/howto.rst
Normal file
33
akka-docs/scala/howto.rst
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
|
||||||
|
.. _howto-scala:
|
||||||
|
|
||||||
|
######################
|
||||||
|
HowTo: Common Patterns
|
||||||
|
######################
|
||||||
|
|
||||||
|
This section lists common actor patterns which have been found to be useful,
|
||||||
|
elegant or instructive. Anything is welcome, example topics being message
|
||||||
|
routing strategies, supervision patterns, restart handling, etc. As a special
|
||||||
|
bonus, additions to this section are marked with the contributor’s name, and it
|
||||||
|
would be nice if every Akka user who finds a recurring pattern in his or her
|
||||||
|
code could share it for the profit of all. Where applicable it might also make
|
||||||
|
sense to add to the ``akka.pattern`` package for creating an `OTP-like library
|
||||||
|
<http://www.erlang.org/doc/man_index.html>`_.
|
||||||
|
|
||||||
|
Template Pattern
|
||||||
|
================
|
||||||
|
|
||||||
|
*Contributed by: N. N.*
|
||||||
|
|
||||||
|
This is an especially nice pattern, since it does even come with some empty example code:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/pattern/ScalaTemplate.scala
|
||||||
|
:include: all-of-it
|
||||||
|
:exclude: uninteresting-stuff
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
Spread the word: this is the easiest way to get famous!
|
||||||
|
|
||||||
|
Please keep this pattern at the end of this file.
|
||||||
|
|
||||||
|
|
@ -28,3 +28,4 @@ Scala API
|
||||||
zeromq
|
zeromq
|
||||||
microkernel
|
microkernel
|
||||||
camel
|
camel
|
||||||
|
howto
|
||||||
|
|
|
||||||
|
|
@ -380,9 +380,7 @@ The dispatcher for created children of the router will be taken from
|
||||||
makes sense to configure the :class:`BalancingDispatcher` if the precise
|
makes sense to configure the :class:`BalancingDispatcher` if the precise
|
||||||
routing is not so important (i.e. no consistent hashing or round-robin is
|
routing is not so important (i.e. no consistent hashing or round-robin is
|
||||||
required); this enables newly created routees to pick up work immediately by
|
required); this enables newly created routees to pick up work immediately by
|
||||||
stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher``
|
stealing it from their siblings.
|
||||||
together with any kind of ``Router``, trying to do so will make your actor fail verification.
|
|
||||||
|
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
||||||
|
|
@ -390,8 +388,8 @@ together with any kind of ``Router``, trying to do so will make your actor fail
|
||||||
that was configured for them in their ``Props``, it is not possible to change an actors dispatcher
|
that was configured for them in their ``Props``, it is not possible to change an actors dispatcher
|
||||||
after it has been created.
|
after it has been created.
|
||||||
|
|
||||||
The “head” router, of course, cannot run on the same balancing dispatcher,
|
The “head” router cannot always run on the same dispatcher, because it
|
||||||
because it does not process the same messages, hence this special actor does
|
does not process the same type of messages, hence this special actor does
|
||||||
not use the dispatcher configured in :class:`Props`, but takes the
|
not use the dispatcher configured in :class:`Props`, but takes the
|
||||||
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
|
``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to
|
||||||
the actor system’s default dispatcher. All standard routers allow setting this
|
the actor system’s default dispatcher. All standard routers allow setting this
|
||||||
|
|
@ -400,3 +398,31 @@ implement the method in a suitable way.
|
||||||
|
|
||||||
.. includecode:: code/docs/routing/RouterDocSpec.scala#dispatchers
|
.. includecode:: code/docs/routing/RouterDocSpec.scala#dispatchers
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
It is not allowed to configure the ``routerDispatcher`` to be a
|
||||||
|
:class:`BalancingDispatcher` since the messages meant for the special
|
||||||
|
router actor cannot be processed by any other actor.
|
||||||
|
|
||||||
|
At first glance there seems to be an overlap between the
|
||||||
|
:class:`BalancingDispatcher` and Routers, but they complement each other.
|
||||||
|
The balancing dispatcher is in charge of running the actors while the routers
|
||||||
|
are in charge of deciding which message goes where. A router can also have
|
||||||
|
children that span multiple actor systems, even remote ones, but a dispatcher
|
||||||
|
lives inside a single actor system.
|
||||||
|
|
||||||
|
When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher`
|
||||||
|
there are some configuration settings to take into account.
|
||||||
|
|
||||||
|
- There can only be ``nr-of-instances`` messages being processed at the same
|
||||||
|
time no matter how many threads are configured for the
|
||||||
|
:class:`BalancingDispatcher`.
|
||||||
|
|
||||||
|
- Having ``throughput`` set to a low number makes no sense since you will only
|
||||||
|
be handing off to another actor that processes the same :class:`MailBox`
|
||||||
|
as yourself, which can be costly. Either the message just got into the
|
||||||
|
mailbox and you can receive it as well as anybody else, or everybody else
|
||||||
|
is busy and you are the only one available to receive the message.
|
||||||
|
|
||||||
|
- Resizing the number of routees only introduce inertia, since resizing
|
||||||
|
is performed at specified intervals, but work stealing is instantaneous.
|
||||||
|
|
|
||||||
|
|
@ -84,7 +84,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
|
||||||
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
|
barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref)
|
||||||
a.send(barrier, EnterBarrier("bar"))
|
a.send(barrier, EnterBarrier("bar"))
|
||||||
noMsg(a, b)
|
noMsg(a, b)
|
||||||
within(1 second) {
|
within(2 second) {
|
||||||
b.send(barrier, EnterBarrier("bar"))
|
b.send(barrier, EnterBarrier("bar"))
|
||||||
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
|
|
@ -100,7 +100,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
|
||||||
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
|
barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref)
|
||||||
b.send(barrier, EnterBarrier("bar"))
|
b.send(barrier, EnterBarrier("bar"))
|
||||||
noMsg(a, b, c)
|
noMsg(a, b, c)
|
||||||
within(1 second) {
|
within(2 second) {
|
||||||
c.send(barrier, EnterBarrier("bar"))
|
c.send(barrier, EnterBarrier("bar"))
|
||||||
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
|
|
@ -119,7 +119,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
|
||||||
barrier ! RemoveClient(A)
|
barrier ! RemoveClient(A)
|
||||||
barrier ! ClientDisconnected(A)
|
barrier ! ClientDisconnected(A)
|
||||||
noMsg(a, b, c)
|
noMsg(a, b, c)
|
||||||
b.within(1 second) {
|
b.within(2 second) {
|
||||||
barrier ! RemoveClient(C)
|
barrier ! RemoveClient(C)
|
||||||
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
}
|
}
|
||||||
|
|
@ -265,7 +265,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
|
||||||
b.expectMsg(ToClient(Done))
|
b.expectMsg(ToClient(Done))
|
||||||
a.send(barrier, EnterBarrier("bar"))
|
a.send(barrier, EnterBarrier("bar"))
|
||||||
noMsg(a, b)
|
noMsg(a, b)
|
||||||
within(1 second) {
|
within(2 second) {
|
||||||
b.send(barrier, EnterBarrier("bar"))
|
b.send(barrier, EnterBarrier("bar"))
|
||||||
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
|
|
@ -284,7 +284,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
|
||||||
c.expectMsg(ToClient(Done))
|
c.expectMsg(ToClient(Done))
|
||||||
b.send(barrier, EnterBarrier("bar"))
|
b.send(barrier, EnterBarrier("bar"))
|
||||||
noMsg(a, b, c)
|
noMsg(a, b, c)
|
||||||
within(1 second) {
|
within(2 second) {
|
||||||
c.send(barrier, EnterBarrier("bar"))
|
c.send(barrier, EnterBarrier("bar"))
|
||||||
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
a.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
|
|
@ -306,7 +306,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with
|
||||||
barrier ! Remove(A)
|
barrier ! Remove(A)
|
||||||
barrier ! ClientDisconnected(A)
|
barrier ! ClientDisconnected(A)
|
||||||
noMsg(a, b, c)
|
noMsg(a, b, c)
|
||||||
b.within(1 second) {
|
b.within(2 second) {
|
||||||
barrier ! Remove(C)
|
barrier ! Remove(C)
|
||||||
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
b.expectMsg(ToClient(BarrierResult("bar", true)))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,8 @@ import com.typesafe.sbtmultijvm.MultiJvmPlugin
|
||||||
import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions }
|
import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions }
|
||||||
import com.typesafe.sbtscalariform.ScalariformPlugin
|
import com.typesafe.sbtscalariform.ScalariformPlugin
|
||||||
import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys
|
import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys
|
||||||
|
import com.typesafe.sbtosgi.OsgiPlugin.osgiSettings
|
||||||
|
import com.typesafe.sbtosgi.OsgiKeys
|
||||||
import java.lang.Boolean.getBoolean
|
import java.lang.Boolean.getBoolean
|
||||||
import Sphinx.{ sphinxDocs, sphinxHtml, sphinxLatex, sphinxPdf, sphinxPygments, sphinxTags }
|
import Sphinx.{ sphinxDocs, sphinxHtml, sphinxLatex, sphinxPdf, sphinxPygments, sphinxTags }
|
||||||
|
|
||||||
|
|
@ -44,7 +46,7 @@ object AkkaBuild extends Build {
|
||||||
lazy val actor = Project(
|
lazy val actor = Project(
|
||||||
id = "akka-actor",
|
id = "akka-actor",
|
||||||
base = file("akka-actor"),
|
base = file("akka-actor"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.actor ++ Seq(
|
||||||
autoCompilerPlugins := true,
|
autoCompilerPlugins := true,
|
||||||
libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % v) },
|
libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % v) },
|
||||||
scalacOptions += "-P:continuations:enable",
|
scalacOptions += "-P:continuations:enable",
|
||||||
|
|
@ -78,14 +80,14 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-remote",
|
id = "akka-remote",
|
||||||
base = file("akka-remote"),
|
base = file("akka-remote"),
|
||||||
dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"),
|
dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"),
|
||||||
settings = defaultSettings ++ multiJvmSettings ++ Seq(
|
settings = defaultSettings ++ multiJvmSettings ++ OSGi.remote ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.remote,
|
libraryDependencies ++= Dependencies.remote,
|
||||||
// disable parallel tests
|
// disable parallel tests
|
||||||
parallelExecution in Test := false,
|
parallelExecution in Test := false,
|
||||||
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
||||||
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
||||||
},
|
},
|
||||||
scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"),
|
scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions,
|
||||||
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
||||||
test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x }
|
test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x }
|
||||||
)
|
)
|
||||||
|
|
@ -101,7 +103,7 @@ object AkkaBuild extends Build {
|
||||||
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
||||||
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
||||||
},
|
},
|
||||||
scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"),
|
scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions,
|
||||||
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
||||||
test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x }
|
test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x }
|
||||||
)
|
)
|
||||||
|
|
@ -111,14 +113,14 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-cluster",
|
id = "akka-cluster",
|
||||||
base = file("akka-cluster"),
|
base = file("akka-cluster"),
|
||||||
dependencies = Seq(remote, remoteTests % "compile;test->test;multi-jvm->multi-jvm", testkit % "test->test"),
|
dependencies = Seq(remote, remoteTests % "compile;test->test;multi-jvm->multi-jvm", testkit % "test->test"),
|
||||||
settings = defaultSettings ++ multiJvmSettings ++ Seq(
|
settings = defaultSettings ++ multiJvmSettings ++ OSGi.cluster ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.cluster,
|
libraryDependencies ++= Dependencies.cluster,
|
||||||
// disable parallel tests
|
// disable parallel tests
|
||||||
parallelExecution in Test := false,
|
parallelExecution in Test := false,
|
||||||
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
|
||||||
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
|
||||||
},
|
},
|
||||||
scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"),
|
scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions,
|
||||||
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
jvmOptions in MultiJvm := defaultMultiJvmOptions,
|
||||||
test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x }
|
test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x }
|
||||||
)
|
)
|
||||||
|
|
@ -128,7 +130,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-slf4j",
|
id = "akka-slf4j",
|
||||||
base = file("akka-slf4j"),
|
base = file("akka-slf4j"),
|
||||||
dependencies = Seq(actor, testkit % "test->test"),
|
dependencies = Seq(actor, testkit % "test->test"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.slf4j ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.slf4j
|
libraryDependencies ++= Dependencies.slf4j
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
@ -137,7 +139,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-agent",
|
id = "akka-agent",
|
||||||
base = file("akka-agent"),
|
base = file("akka-agent"),
|
||||||
dependencies = Seq(actor, testkit % "test->test"),
|
dependencies = Seq(actor, testkit % "test->test"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.agent ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.agent
|
libraryDependencies ++= Dependencies.agent
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
@ -146,7 +148,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-transactor",
|
id = "akka-transactor",
|
||||||
base = file("akka-transactor"),
|
base = file("akka-transactor"),
|
||||||
dependencies = Seq(actor, testkit % "test->test"),
|
dependencies = Seq(actor, testkit % "test->test"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.transactor ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.transactor
|
libraryDependencies ++= Dependencies.transactor
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
@ -164,7 +166,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-mailboxes-common",
|
id = "akka-mailboxes-common",
|
||||||
base = file("akka-durable-mailboxes/akka-mailboxes-common"),
|
base = file("akka-durable-mailboxes/akka-mailboxes-common"),
|
||||||
dependencies = Seq(remote, testkit % "compile;test->test"),
|
dependencies = Seq(remote, testkit % "compile;test->test"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.mailboxesCommon ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.mailboxes,
|
libraryDependencies ++= Dependencies.mailboxes,
|
||||||
// DurableMailboxSpec published in akka-mailboxes-common-test
|
// DurableMailboxSpec published in akka-mailboxes-common-test
|
||||||
publishArtifact in Test := true
|
publishArtifact in Test := true
|
||||||
|
|
@ -175,7 +177,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-file-mailbox",
|
id = "akka-file-mailbox",
|
||||||
base = file("akka-durable-mailboxes/akka-file-mailbox"),
|
base = file("akka-durable-mailboxes/akka-file-mailbox"),
|
||||||
dependencies = Seq(mailboxesCommon % "compile;test->test", testkit % "test"),
|
dependencies = Seq(mailboxesCommon % "compile;test->test", testkit % "test"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.fileMailbox ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.fileMailbox
|
libraryDependencies ++= Dependencies.fileMailbox
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
@ -184,7 +186,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-zeromq",
|
id = "akka-zeromq",
|
||||||
base = file("akka-zeromq"),
|
base = file("akka-zeromq"),
|
||||||
dependencies = Seq(actor, testkit % "test;test->test"),
|
dependencies = Seq(actor, testkit % "test;test->test"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.zeroMQ ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.zeroMQ
|
libraryDependencies ++= Dependencies.zeroMQ
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
@ -202,7 +204,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-camel",
|
id = "akka-camel",
|
||||||
base = file("akka-camel"),
|
base = file("akka-camel"),
|
||||||
dependencies = Seq(actor, slf4j, testkit % "test->test"),
|
dependencies = Seq(actor, slf4j, testkit % "test->test"),
|
||||||
settings = defaultSettings ++ Seq(
|
settings = defaultSettings ++ OSGi.camel ++ Seq(
|
||||||
libraryDependencies ++= Dependencies.camel
|
libraryDependencies ++= Dependencies.camel
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
@ -298,7 +300,7 @@ object AkkaBuild extends Build {
|
||||||
|
|
||||||
val defaultExcludedTags = Seq("timing", "long-running")
|
val defaultExcludedTags = Seq("timing", "long-running")
|
||||||
|
|
||||||
val defaultMultiJvmOptions: Seq[String] = {
|
lazy val defaultMultiJvmOptions: Seq[String] = {
|
||||||
(System.getProperty("akka.test.timefactor") match {
|
(System.getProperty("akka.test.timefactor") match {
|
||||||
case null => Nil
|
case null => Nil
|
||||||
case x => List("-Dakka.test.timefactor=" + x)
|
case x => List("-Dakka.test.timefactor=" + x)
|
||||||
|
|
@ -306,6 +308,31 @@ object AkkaBuild extends Build {
|
||||||
(if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil)
|
(if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)
|
||||||
|
lazy val defaultExcludeTestNames: Seq[String] = {
|
||||||
|
val exclude = System.getProperty("akka.test.names.exclude", "")
|
||||||
|
if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq
|
||||||
|
}
|
||||||
|
|
||||||
|
// for excluding tests by tag (or use system property: -Dakka.test.tags.exclude=timing)
|
||||||
|
lazy val defaultExcludeTestTags: Seq[String] = {
|
||||||
|
val exclude = System.getProperty("akka.test.tags.exclude", "")
|
||||||
|
if (exclude.isEmpty) defaultExcludedTags else exclude.split(",").toSeq
|
||||||
|
}
|
||||||
|
|
||||||
|
// for including tests by tag (or use system property: -Dakka.test.tags.include=timing)
|
||||||
|
lazy val defaultIncludeTestTags: Seq[String] = {
|
||||||
|
val include = System.getProperty("akka.test.tags.include", "")
|
||||||
|
if (include.isEmpty) Seq.empty else include.split(",").toSeq
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy val defaultMultiJvmScalatestOptions: Seq[String] = {
|
||||||
|
val excludeTags = (defaultExcludeTestTags.toSet -- defaultIncludeTestTags.toSet).toSeq
|
||||||
|
Seq("-r", "org.scalatest.akka.QuietReporter") ++
|
||||||
|
(if (excludeTags.isEmpty) Seq.empty else Seq("-l", excludeTags.mkString(" "))) ++
|
||||||
|
(if (defaultIncludeTestTags.isEmpty) Seq.empty else Seq("-n", defaultIncludeTestTags.mkString(" ")))
|
||||||
|
}
|
||||||
|
|
||||||
lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq(
|
lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq(
|
||||||
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
|
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
|
||||||
|
|
||||||
|
|
@ -318,23 +345,9 @@ object AkkaBuild extends Build {
|
||||||
|
|
||||||
parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean,
|
parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean,
|
||||||
|
|
||||||
// for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)
|
excludeTestNames := defaultExcludeTestNames,
|
||||||
excludeTestNames := {
|
excludeTestTags := defaultExcludeTestTags,
|
||||||
val exclude = System.getProperty("akka.test.names.exclude", "")
|
includeTestTags := defaultIncludeTestTags,
|
||||||
if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq
|
|
||||||
},
|
|
||||||
|
|
||||||
// for excluding tests by tag (or use system property: -Dakka.test.tags.exclude=timing)
|
|
||||||
excludeTestTags := {
|
|
||||||
val exclude = System.getProperty("akka.test.tags.exclude", "")
|
|
||||||
if (exclude.isEmpty) defaultExcludedTags else exclude.split(",").toSeq
|
|
||||||
},
|
|
||||||
|
|
||||||
// for including tests by tag (or use system property: -Dakka.test.tags.include=timing)
|
|
||||||
includeTestTags := {
|
|
||||||
val include = System.getProperty("akka.test.tags.include", "")
|
|
||||||
if (include.isEmpty) Seq.empty else include.split(",").toSeq
|
|
||||||
},
|
|
||||||
|
|
||||||
// add filters for tests excluded by name
|
// add filters for tests excluded by name
|
||||||
testOptions in Test <++= excludeTestNames map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) },
|
testOptions in Test <++= excludeTestNames map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) },
|
||||||
|
|
@ -457,3 +470,38 @@ object Dependency {
|
||||||
val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2
|
val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OSGi settings
|
||||||
|
|
||||||
|
object OSGi {
|
||||||
|
|
||||||
|
val actor = exports(Seq("akka*"))
|
||||||
|
|
||||||
|
val agent = exports(Seq("akka.agent.*"))
|
||||||
|
|
||||||
|
val camel = exports(Seq("akka.camel.*", "akka.camelexamples"))
|
||||||
|
|
||||||
|
val cluster = exports(Seq("akka.cluster.*"))
|
||||||
|
|
||||||
|
val fileMailbox = exports(Seq("akka.actor.mailbox.*"))
|
||||||
|
|
||||||
|
val mailboxesCommon = exports(Seq("akka.actor.mailbox.*"))
|
||||||
|
|
||||||
|
val remote = exports(Seq("akka.remote.*", "akka.routing.*", "akka.serialization.*"))
|
||||||
|
|
||||||
|
val slf4j = exports(Seq("akka.event.slf4j.*"))
|
||||||
|
|
||||||
|
val transactor = exports(Seq("akka.transactor.*"))
|
||||||
|
|
||||||
|
val zeroMQ = exports(Seq("akka.zeromq.*"))
|
||||||
|
|
||||||
|
def exports(packages: Seq[String]) = osgiSettings ++ Seq(
|
||||||
|
OsgiKeys.importPackage := Seq("!sun.misc", akkaImport(), configImport(), scalaImport(), "*"),
|
||||||
|
OsgiKeys.exportPackage := packages
|
||||||
|
)
|
||||||
|
|
||||||
|
def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,2.2)\"".format(packageName)
|
||||||
|
def configImport(packageName: String = "com.typesafe.config.*") = "%s;version=\"[0.4,0.5)\"".format(packageName)
|
||||||
|
def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.9.2,2.10)\"".format(packageName)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M1")
|
||||||
|
|
||||||
addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.4.0")
|
addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.4.0")
|
||||||
|
|
||||||
|
addSbtPlugin("com.typesafe.sbtosgi" % "sbtosgi" % "0.2.0")
|
||||||
|
|
||||||
resolvers ++= Seq(
|
resolvers ++= Seq(
|
||||||
// needed for sbt-assembly, which comes with sbt-multi-jvm
|
// needed for sbt-assembly, which comes with sbt-multi-jvm
|
||||||
Resolver.url("sbtonline", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns),
|
Resolver.url("sbtonline", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue