diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala index bedf51f083..5752bd7806 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorConfigurationVerificationSpec.scala @@ -42,27 +42,27 @@ class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVeri "An Actor configured with a BalancingDispatcher" must { "fail verification with a ConfigurationException if also configured with a RoundRobinRouter" in { intercept[ConfigurationException] { - system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RoundRobinRouter(2))) + system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(2).withDispatcher("balancing-dispatcher"))) } } "fail verification with a ConfigurationException if also configured with a BroadcastRouter" in { intercept[ConfigurationException] { - system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(BroadcastRouter(2))) + system.actorOf(Props[TestActor].withRouter(BroadcastRouter(2).withDispatcher("balancing-dispatcher"))) } } "fail verification with a ConfigurationException if also configured with a RandomRouter" in { intercept[ConfigurationException] { - system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(RandomRouter(2))) + system.actorOf(Props[TestActor].withRouter(RandomRouter(2).withDispatcher("balancing-dispatcher"))) } } "fail verification with a ConfigurationException if also configured with a SmallestMailboxRouter" in { intercept[ConfigurationException] { - system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(SmallestMailboxRouter(2))) + system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(2).withDispatcher("balancing-dispatcher"))) } } "fail verification with a ConfigurationException if also configured with a ScatterGatherFirstCompletedRouter" in { intercept[ConfigurationException] { - system.actorOf(Props[TestActor].withDispatcher("balancing-dispatcher").withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 2, within = 2 seconds))) + system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(nrOfInstances = 2, within = 2 seconds).withDispatcher("balancing-dispatcher"))) } } "not fail verification with a ConfigurationException also not configured with a Router" in { diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index ede4a69d7c..0a87273d61 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -128,6 +128,35 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with current.routees.size must be(2) } + "resize when busy" ignore { + + val busy = new TestLatch(1) + + val resizer = DefaultResizer( + lowerBound = 1, + upperBound = 3, + pressureThreshold = 0, + messagesPerResize = 1) + + val router = system.actorOf(Props[BusyActor].withRouter(RoundRobinRouter(resizer = Some(resizer))).withDispatcher("bal-disp")) + + val latch1 = new TestLatch(1) + router ! (latch1, busy) + Await.ready(latch1, 2 seconds) + + val latch2 = new TestLatch(1) + router ! (latch2, busy) + Await.ready(latch2, 2 seconds) + + val latch3 = new TestLatch(1) + router ! (latch3, busy) + Await.ready(latch3, 2 seconds) + + Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) + + busy.countDown() + } + "grow as needed under pressure" in { // make sure the pool starts at the expected lower limit and grows to the upper as needed // as influenced by the backlog of blocking pooled actors diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 292a437dab..dfd6200fd3 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -87,6 +87,8 @@ object Props { * Props is a ActorRef configuration object, that is thread safe and fully sharable. * Used when creating new actors through; ActorSystem.actorOf and ActorContext.actorOf. * + * In case of providing code which creates the actual Actor instance, that must not return the same instance multiple times. + * * Examples on Scala API: * {{{ * val props = Props[MyActor] @@ -145,6 +147,8 @@ case class Props( /** * Returns a new Props with the specified creator set. * + * The creator must not return the same instance multiple times. + * * Scala API. */ def withCreator(c: ⇒ Actor): Props = copy(creator = () ⇒ c) @@ -152,6 +156,8 @@ case class Props( /** * Returns a new Props with the specified creator set. * + * The creator must not return the same instance multiple times. + * * Java API. */ def withCreator(c: Creator[Actor]): Props = copy(creator = () ⇒ c.create) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 54ec2d08b4..e3c7f8348c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -68,6 +68,7 @@ object Await { * WARNING: Blocking operation, use with caution. * * @throws [[java.util.concurrent.TimeoutException]] if times out + * @throws [[java.lang.Throwable]] (throws clause is Exception due to Java) if there was a problem * @return The returned value as returned by Awaitable.result */ @throws(classOf[Exception]) diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 5bd38ad52a..b0db141aee 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -38,6 +38,9 @@ trait Effect { * A constructor/factory, takes no parameters but creates a new value of type T every call. */ trait Creator[T] { + /** + * This method must return a different instance upon every call. + */ def create(): T } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 94eed672f4..2f585a1790 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -30,10 +30,10 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _path) { // verify that a BalancingDispatcher is not used with a Router - if (_system.dispatchers.isBalancingDispatcher(_props.dispatcher) && _props.routerConfig != NoRouter) + if (_props.routerConfig != NoRouter && _system.dispatchers.isBalancingDispatcher(_props.routerConfig.routerDispatcher)) throw new ConfigurationException( "Configuration for actor [" + _path.toString + - "] is invalid - you can not use a 'BalancingDispatcher' together with any type of 'Router'") + "] is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.") /* * CAUTION: RoutedActorRef is PROBLEMATIC diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 87129a7a7c..6a7ebcee86 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -1,90 +1,95 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -// -//package akka.cluster -// -//import org.scalatest.BeforeAndAfter -//import com.typesafe.config.ConfigFactory -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { -// val a1 = role("a1") -// val a2 = role("a2") -// val b1 = role("b1") -// val b2 = role("b2") -// val c1 = role("c1") -// val c2 = role("c2") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec -//class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -// -//abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import JoinTwoClustersMultiJvmSpec._ -// -// override def initialParticipants = 6 -// -// after { -// testConductor.enter("after") -// } -// -// val a1Address = node(a1).address -// val b1Address = node(b1).address -// val c1Address = node(c1).address -// -// "Three different clusters (A, B and C)" must { -// -// "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { -// -// runOn(a1, a2) { -// cluster.join(a1Address) -// } -// runOn(b1, b2) { -// cluster.join(b1Address) -// } -// runOn(c1, c2) { -// cluster.join(c1Address) -// } -// -// awaitUpConvergence(numberOfMembers = 2) -// -// assertLeader(a1, a2) -// assertLeader(b1, b2) -// assertLeader(c1, c2) -// -// runOn(b2) { -// cluster.join(a1Address) -// } -// -// runOn(a1, a2, b1, b2) { -// awaitUpConvergence(numberOfMembers = 4) -// } -// -// assertLeader(a1, a2, b1, b2) -// assertLeader(c1, c2) -// -// } -// -// "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { -// -// runOn(b2) { -// cluster.join(c1Address) -// } -// -// awaitUpConvergence(numberOfMembers = 6) -// -// assertLeader(a1, a2, b1, b2, c1, c2) -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { + val a1 = role("a1") + val a2 = role("a2") + val b1 = role("b1") + val b2 = role("b2") + val c1 = role("c1") + val c2 = role("c2") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec + +abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import JoinTwoClustersMultiJvmSpec._ + + override def initialParticipants = 6 + + after { + testConductor.enter("after") + } + + lazy val a1Address = node(a1).address + lazy val b1Address = node(b1).address + lazy val c1Address = node(c1).address + + "Three different clusters (A, B and C)" must { + + "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(a1, b1, c1) { + cluster + } + testConductor.enter("first-started") + + runOn(a1, a2) { + cluster.join(a1Address) + } + runOn(b1, b2) { + cluster.join(b1Address) + } + runOn(c1, c2) { + cluster.join(c1Address) + } + + awaitUpConvergence(numberOfMembers = 2) + + assertLeader(a1, a2) + assertLeader(b1, b2) + assertLeader(c1, c2) + + runOn(b2) { + cluster.join(a1Address) + } + + runOn(a1, a2, b1, b2) { + awaitUpConvergence(numberOfMembers = 4) + } + + assertLeader(a1, a2, b1, b2) + assertLeader(c1, c2) + + } + + "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { + + runOn(b2) { + cluster.join(c1Address) + } + + awaitUpConvergence(numberOfMembers = 6) + + assertLeader(a1, a2, b1, b2, c1, c2) + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala new file mode 100644 index 0000000000..54f744a6c8 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object LeaderElectionMultiJvmSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down = off + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec + +abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { + import LeaderElectionMultiJvmSpec._ + + override def initialParticipants = 5 + + lazy val firstAddress = node(first).address + + // sorted in the order used by the cluster + lazy val roles = Seq(first, second, third, fourth).sorted + + "A cluster of four nodes" must { + + "be able to 'elect' a single leader" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + + if (mySelf != controller) { + cluster.join(firstAddress) + awaitUpConvergence(numberOfMembers = roles.size) + cluster.isLeader must be(mySelf == roles.head) + } + testConductor.enter("after") + } + + def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { + val currentRoles = roles.drop(alreadyShutdown) + currentRoles.size must be >= (2) + val leader = currentRoles.head + val aUser = currentRoles.last + + mySelf match { + + case `controller` ⇒ + testConductor.enter("before-shutdown") + testConductor.shutdown(leader, 0) + testConductor.removeNode(leader) + testConductor.enter("after-shutdown", "after-down", "completed") + + case `leader` ⇒ + testConductor.enter("before-shutdown") + // this node will be shutdown by the controller and doesn't participate in more barriers + + case `aUser` ⇒ + val leaderAddress = node(leader).address + testConductor.enter("before-shutdown", "after-shutdown") + // user marks the shutdown leader as DOWN + cluster.down(leaderAddress) + testConductor.enter("after-down", "completed") + + case _ if currentRoles.tail.contains(mySelf) ⇒ + // remaining cluster nodes, not shutdown + testConductor.enter("before-shutdown", "after-shutdown", "after-down") + + awaitUpConvergence(currentRoles.size - 1) + val nextExpectedLeader = currentRoles.tail.head + cluster.isLeader must be(mySelf == nextExpectedLeader) + + testConductor.enter("completed") + + } + + } + + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) + } + + "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index 6bb0f556d5..dc915912ee 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -1,77 +1,83 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -//package akka.cluster -// -//import scala.collection.immutable.SortedSet -//import org.scalatest.BeforeAndAfter -//import com.typesafe.config.ConfigFactory -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { -// val first = role("first") -// val second = role("second") -// val third = role("third") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec -//class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec -//class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec -// -//abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) -// with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import MembershipChangeListenerMultiJvmSpec._ -// -// override def initialParticipants = 3 -// -// after { -// testConductor.enter("after") -// } -// -// "A set of connected cluster systems" must { -// -// val firstAddress = node(first).address -// val secondAddress = node(second).address -// -// "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { -// -// runOn(first, second) { -// cluster.join(firstAddress) -// val latch = TestLatch() -// cluster.registerListener(new MembershipChangeListener { -// def notify(members: SortedSet[Member]) { -// if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) -// latch.countDown() -// } -// }) -// latch.await -// cluster.convergence.isDefined must be(true) -// } -// -// } -// -// "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { -// -// runOn(third) { -// cluster.join(firstAddress) -// } -// -// val latch = TestLatch() -// cluster.registerListener(new MembershipChangeListener { -// def notify(members: SortedSet[Member]) { -// if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) -// latch.countDown() -// } -// }) -// latch.await -// cluster.convergence.isDefined must be(true) -// -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec +class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec +class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec + +abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import MembershipChangeListenerMultiJvmSpec._ + + override def initialParticipants = 3 + + after { + testConductor.enter("after") + } + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A set of connected cluster systems" must { + + "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + // make sure that the node-to-join is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + + runOn(first, second) { + cluster.join(firstAddress) + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + latch.await + cluster.convergence.isDefined must be(true) + } + + } + + "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + runOn(third) { + cluster.join(firstAddress) + } + + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + latch.await + cluster.convergence.isDefined must be(true) + + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index cadbb7b298..d5624f4999 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -45,8 +45,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ */ def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) { nodesInCluster.length must not be (0) - import Member.addressOrdering - val expectedLeader = nodesInCluster.map(role ⇒ (role, node(role).address)).sortBy(_._2).head._1 + val expectedLeader = roleOfLeader(nodesInCluster) cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) } @@ -66,6 +65,23 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ def awaitUpConvergence(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { awaitCond(cluster.latestGossip.members.size == nrOfMembers) awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(canNotBePartOfRing forall (address => !(cluster.latestGossip.members exists (_.address.port == address.port)))) + awaitCond(canNotBePartOfRing forall (address ⇒ !(cluster.latestGossip.members exists (_.address == address)))) + } + + def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + nodesInCluster.length must not be (0) + nodesInCluster.sorted.head + } + + /** + * Sort the roles in the order used by the cluster. + */ + implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { + import Member.addressOrdering + def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address) + } + + def roleName(address: Address): Option[RoleName] = { + testConductor.getNodes.await.find(node(_).address == address) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 21defd1d97..232d6ca0e7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -1,70 +1,76 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -//package akka.cluster -// -//import com.typesafe.config.ConfigFactory -//import org.scalatest.BeforeAndAfter -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object NodeMembershipMultiJvmSpec extends MultiNodeConfig { -// val first = role("first") -// val second = role("second") -// val third = role("third") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec -//class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec -//class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec -// -//abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import NodeMembershipMultiJvmSpec._ -// -// override def initialParticipants = 3 -// -// after { -// testConductor.enter("after") -// } -// -// val firstAddress = node(first).address -// val secondAddress = node(second).address -// val thirdAddress = node(third).address -// -// "A set of connected cluster systems" must { -// -// "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { -// -// runOn(first, second) { -// cluster.join(firstAddress) -// awaitCond(cluster.latestGossip.members.size == 2) -// assertMembers(cluster.latestGossip.members, firstAddress, secondAddress) -// awaitCond { -// cluster.latestGossip.members.forall(_.status == MemberStatus.Up) -// } -// awaitCond(cluster.convergence.isDefined) -// } -// -// } -// -// "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { -// -// runOn(third) { -// cluster.join(firstAddress) -// } -// -// awaitCond(cluster.latestGossip.members.size == 3) -// assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress) -// awaitCond { -// cluster.latestGossip.members.forall(_.status == MemberStatus.Up) -// } -// awaitCond(cluster.convergence.isDefined) -// -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object NodeMembershipMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec + +abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeMembershipMultiJvmSpec._ + + override def initialParticipants = 3 + + after { + testConductor.enter("after") + } + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A set of connected cluster systems" must { + + "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + + // make sure that the node-to-join is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + + runOn(first, second) { + cluster.join(firstAddress) + awaitCond(cluster.latestGossip.members.size == 2) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress) + awaitCond { + cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + } + awaitCond(cluster.convergence.isDefined) + } + + } + + "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { + + runOn(third) { + cluster.join(firstAddress) + } + + awaitCond(cluster.latestGossip.members.size == 3) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress) + awaitCond { + cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + } + awaitCond(cluster.convergence.isDefined) + + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index ff4c06215d..fcbcce746f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -1,74 +1,74 @@ -///** -// * Copyright (C) 2009-2012 Typesafe Inc. -// */ -//package akka.cluster -// -//import com.typesafe.config.ConfigFactory -//import org.scalatest.BeforeAndAfter -//import akka.remote.testkit.MultiNodeConfig -//import akka.remote.testkit.MultiNodeSpec -//import akka.testkit._ -// -//object NodeStartupMultiJvmSpec extends MultiNodeConfig { -// val first = role("first") -// val second = role("second") -// -// commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -// -//} -// -//class NodeStartupMultiJvmNode1 extends NodeStartupSpec -//class NodeStartupMultiJvmNode2 extends NodeStartupSpec -// -//abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { -// import NodeStartupMultiJvmSpec._ -// -// override def initialParticipants = 2 -// -// after { -// testConductor.enter("after") -// } -// -// val firstAddress = node(first).address -// val secondAddress = node(second).address -// -// "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { -// -// "be a singleton cluster when started up" taggedAs LongRunningTest in { -// runOn(first) { -// awaitCond(cluster.isSingletonCluster) -// // FIXME #2117 singletonCluster should reach convergence -// //awaitCond(cluster.convergence.isDefined) -// } -// } -// -// "be in 'Joining' phase when started up" taggedAs LongRunningTest in { -// runOn(first) { -// val members = cluster.latestGossip.members -// members.size must be(1) -// -// val joiningMember = members find (_.address == firstAddress) -// joiningMember must not be (None) -// joiningMember.get.status must be(MemberStatus.Joining) -// } -// } -// } -// -// "A second cluster node" must { -// "join the other node cluster when sending a Join command" taggedAs LongRunningTest in { -// -// runOn(second) { -// cluster.join(firstAddress) -// } -// -// awaitCond { -// cluster.latestGossip.members.exists { member ⇒ -// member.address == secondAddress && member.status == MemberStatus.Up -// } -// } -// cluster.latestGossip.members.size must be(2) -// awaitCond(cluster.convergence.isDefined) -// } -// } -// -//} +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object NodeStartupMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class NodeStartupMultiJvmNode1 extends NodeStartupSpec +class NodeStartupMultiJvmNode2 extends NodeStartupSpec + +abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import NodeStartupMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { + + "be a singleton cluster when started up" taggedAs LongRunningTest in { + runOn(first) { + awaitCond(cluster.isSingletonCluster) + // FIXME #2117 singletonCluster should reach convergence + //awaitCond(cluster.convergence.isDefined) + } + } + + "be in 'Joining' phase when started up" taggedAs LongRunningTest in { + runOn(first) { + val members = cluster.latestGossip.members + members.size must be(1) + + val joiningMember = members find (_.address == firstAddress) + joiningMember must not be (None) + joiningMember.get.status must be(MemberStatus.Joining) + } + } + } + + "A second cluster node" must { + "join the other node cluster when sending a Join command" taggedAs LongRunningTest in { + + runOn(second) { + cluster.join(firstAddress) + } + + awaitCond { + cluster.latestGossip.members.exists { member ⇒ + member.address == secondAddress && member.status == MemberStatus.Up + } + } + cluster.latestGossip.members.size must be(2) + awaitCond(cluster.convergence.isDefined) + } + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala deleted file mode 100644 index c262fad8c3..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ - -import com.typesafe.config._ - -import java.net.InetSocketAddress - -class LeaderElectionSpec extends ClusterSpec with ImplicitSender { - val portPrefix = 5 - - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ - - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ - - try { - "A cluster of three nodes" must { - - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node1 = Cluster(system1) - val address1 = node1.remoteAddress - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node2 = Cluster(system2) - val address2 = node2.remoteAddress - - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node3 = Cluster(system3) - val address3 = node3.remoteAddress - - "be able to 'elect' a single leader" taggedAs LongRunningTest in { - - println("Give the system time to converge...") - awaitConvergence(node1 :: node2 :: node3 :: Nil) - - // check leader - node1.isLeader must be(true) - node2.isLeader must be(false) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node1.shutdown() - system1.shutdown() - - // user marks node1 as DOWN - node2.down(address1) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node2 :: node3 :: Nil) - - // check leader - node2.isLeader must be(true) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node2.shutdown() - system2.shutdown() - - // user marks node2 as DOWN - node3.down(address2) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node3 :: Nil) - - // check leader - node3.isLeader must be(true) - } - } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } - - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - } -} diff --git a/akka-docs/additional/index.rst b/akka-docs/additional/index.rst index b3c89356c9..284586d59d 100644 --- a/akka-docs/additional/index.rst +++ b/akka-docs/additional/index.rst @@ -6,3 +6,4 @@ Additional Information recipes language-bindings + osgi diff --git a/akka-docs/additional/osgi.rst b/akka-docs/additional/osgi.rst new file mode 100644 index 0000000000..aea554ef9c --- /dev/null +++ b/akka-docs/additional/osgi.rst @@ -0,0 +1,10 @@ +Akka in OSGi +============ + +Configuring the OSGi Framework +------------------------------ + +To use Akka in an OSGi environment, the ``org.osgi.framework.bootdelegation`` +property must be set to always delegate the ``sun.misc`` package to the boot classloader +instead of resolving it through the normal OSGi class space. + diff --git a/akka-docs/general/supervision.rst b/akka-docs/general/supervision.rst index fef3a585dd..c1bc684ce4 100644 --- a/akka-docs/general/supervision.rst +++ b/akka-docs/general/supervision.rst @@ -55,6 +55,8 @@ actors cannot be orphaned or attached to supervisors from the outside, which might otherwise catch them unawares. In addition, this yields a natural and clean shutdown procedure for (sub-trees of) actor applications. +.. _supervision-restart: + What Restarting Means --------------------- diff --git a/akka-docs/java/code/docs/pattern/JavaTemplate.java b/akka-docs/java/code/docs/pattern/JavaTemplate.java new file mode 100644 index 0000000000..7e6fd175fb --- /dev/null +++ b/akka-docs/java/code/docs/pattern/JavaTemplate.java @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern; + +// this part will not appear in the docs + +//#all-of-it +class JavaTemplate { + public JavaTemplate() { + System.out.println("Hello, Template!"); + } + //#uninteresting-stuff + // don’t show this plumbimg + //#uninteresting-stuff +} +//#all-of-it diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index f7e0db9c3c..2723883e9c 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -72,6 +72,8 @@ There are 4 different types of message dispatchers: - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. + - All the actors share a single Mailbox that they get their messages from. + - It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. - Sharability: Actors of the same type only @@ -85,7 +87,7 @@ There are 4 different types of message dispatchers: "thread-pool-executor" or the FQCN of an ``akka.dispatcher.ExecutorServiceConfigurator`` - - Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification. + - Note that you can **not** use a ``BalancingDispatcher`` as a **Router Dispatcher**. (You can however use it for the **Routees**) * CallingThreadDispatcher diff --git a/akka-docs/java/howto.rst b/akka-docs/java/howto.rst new file mode 100644 index 0000000000..333fb9e498 --- /dev/null +++ b/akka-docs/java/howto.rst @@ -0,0 +1,33 @@ + +.. _howto-java: + +###################### +HowTo: Common Patterns +###################### + +This section lists common actor patterns which have been found to be useful, +elegant or instructive. Anything is welcome, example topics being message +routing strategies, supervision patterns, restart handling, etc. As a special +bonus, additions to this section are marked with the contributor’s name, and it +would be nice if every Akka user who finds a recurring pattern in his or her +code could share it for the profit of all. Where applicable it might also make +sense to add to the ``akka.pattern`` package for creating an `OTP-like library +`_. + +Template Pattern +================ + +*Contributed by: N. N.* + +This is an especially nice pattern, since it does even come with some empty example code: + +.. includecode:: code/docs/pattern/JavaTemplate.java + :include: all-of-it + :exclude: uninteresting-stuff + +.. note:: + + Spread the word: this is the easiest way to get famous! + +Please keep this pattern at the end of this file. + diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index 981e07f869..4022092dba 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -24,3 +24,4 @@ Java API extending-akka zeromq microkernel + howto diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index a55b41c43d..16aa4cee6f 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -380,11 +380,16 @@ The dispatcher for created children of the router will be taken from makes sense to configure the :class:`BalancingDispatcher` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by -stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher`` -together with any kind of ``Router``, trying to do so will make your actor fail verification. +stealing it from their siblings. -The “head” router, of course, cannot run on the same balancing dispatcher, -because it does not process the same messages, hence this special actor does +.. note:: + + If you provide a collection of actors to route to, then they will still use the same dispatcher + that was configured for them in their ``Props``, it is not possible to change an actors dispatcher + after it has been created. + +The “head” router cannot always run on the same dispatcher, because it +does not process the same type of messages, hence this special actor does not use the dispatcher configured in :class:`Props`, but takes the ``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to the actor system’s default dispatcher. All standard routers allow setting this @@ -393,3 +398,31 @@ implement the method in a suitable way. .. includecode:: code/docs/jrouting/CustomRouterDocTestBase.java#dispatchers +.. note:: + + It is not allowed to configure the ``routerDispatcher`` to be a + :class:`BalancingDispatcher` since the messages meant for the special + router actor cannot be processed by any other actor. + +At first glance there seems to be an overlap between the +:class:`BalancingDispatcher` and Routers, but they complement each other. +The balancing dispatcher is in charge of running the actors while the routers +are in charge of deciding which message goes where. A router can also have +children that span multiple actor systems, even remote ones, but a dispatcher +lives inside a single actor system. + +When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher` +there are some configuration settings to take into account. + +- There can only be ``nr-of-instances`` messages being processed at the same + time no matter how many threads are configured for the + :class:`BalancingDispatcher`. + +- Having ``throughput`` set to a low number makes no sense since you will only + be handing off to another actor that processes the same :class:`MailBox` + as yourself, which can be costly. Either the message just got into the + mailbox and you can receive it as well as anybody else, or everybody else + is busy and you are the only one available to receive the message. + +- Resizing the number of routees only introduce inertia, since resizing + is performed at specified intervals, but work stealing is instantaneous. diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index d7c99199ed..7df286d7f7 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -115,6 +115,12 @@ Here is an example: This way of creating the Actor is also great for integrating with Dependency Injection (DI) frameworks like Guice or Spring. +.. warning:: + + You might be tempted at times to offer an ``UntypedActor`` factory which + always returns the same instance, e.g. by using a static field. This is not + supported, as it goes against the meaning of an actor restart, which is + described here: :ref:`supervision-restart`. UntypedActor API ================ diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 92c335120a..291d06e567 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -105,6 +105,13 @@ Here is an example: .. includecode:: code/docs/actor/ActorDocSpec.scala#creating-constructor +.. warning:: + + You might be tempted at times to offer an ``Actor`` factory which always + returns the same instance, e.g. by using a ``lazy val`` or an + ``object ... extends Actor``. This is not supported, as it goes against the + meaning of an actor restart, which is described here: + :ref:`supervision-restart`. Props ----- diff --git a/akka-docs/scala/code/docs/pattern/ScalaTemplate.scala b/akka-docs/scala/code/docs/pattern/ScalaTemplate.scala new file mode 100644 index 0000000000..beceae17b7 --- /dev/null +++ b/akka-docs/scala/code/docs/pattern/ScalaTemplate.scala @@ -0,0 +1,16 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +// this part will not appear in the docs + +//#all-of-it +class ScalaTemplate { + println("Hello, Template!") + //#uninteresting-stuff + // don’t show this plumbimg + //#uninteresting-stuff +} +//#all-of-it diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 478136e428..cea9ee6e0a 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -73,6 +73,8 @@ There are 4 different types of message dispatchers: - This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. + - All the actors share a single Mailbox that they get their messages from. + - It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. - Sharability: Actors of the same type only @@ -86,7 +88,7 @@ There are 4 different types of message dispatchers: "thread-pool-executor" or the FQCN of an ``akka.dispatcher.ExecutorServiceConfigurator`` - - Note that you can **not** use a ``BalancingDispatcher`` together with any kind of ``Router``, trying to do so will make your actor fail verification. + - Note that you can **not** use a ``BalancingDispatcher`` as a **Router Dispatcher**. (You can however use it for the **Routees**) * CallingThreadDispatcher @@ -114,7 +116,7 @@ And then using it: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher -Note that ``thread-pool-executor`` configuration as per the above ``my-thread-pool-dispatcher`` exmaple is +Note that ``thread-pool-executor`` configuration as per the above ``my-thread-pool-dispatcher`` example is NOT applicable. This is because every actor will have its own thread pool when using ``PinnedDispatcher``, and that pool will have only one thread. diff --git a/akka-docs/scala/howto.rst b/akka-docs/scala/howto.rst new file mode 100644 index 0000000000..9436480327 --- /dev/null +++ b/akka-docs/scala/howto.rst @@ -0,0 +1,33 @@ + +.. _howto-scala: + +###################### +HowTo: Common Patterns +###################### + +This section lists common actor patterns which have been found to be useful, +elegant or instructive. Anything is welcome, example topics being message +routing strategies, supervision patterns, restart handling, etc. As a special +bonus, additions to this section are marked with the contributor’s name, and it +would be nice if every Akka user who finds a recurring pattern in his or her +code could share it for the profit of all. Where applicable it might also make +sense to add to the ``akka.pattern`` package for creating an `OTP-like library +`_. + +Template Pattern +================ + +*Contributed by: N. N.* + +This is an especially nice pattern, since it does even come with some empty example code: + +.. includecode:: code/docs/pattern/ScalaTemplate.scala + :include: all-of-it + :exclude: uninteresting-stuff + +.. note:: + + Spread the word: this is the easiest way to get famous! + +Please keep this pattern at the end of this file. + diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index fc1b619e26..ddceb9fcf8 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -28,3 +28,4 @@ Scala API zeromq microkernel camel + howto diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 4d434b2cab..5a37b3471a 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -380,9 +380,7 @@ The dispatcher for created children of the router will be taken from makes sense to configure the :class:`BalancingDispatcher` if the precise routing is not so important (i.e. no consistent hashing or round-robin is required); this enables newly created routees to pick up work immediately by -stealing it from their siblings. Note that you can **not** use a ``BalancingDispatcher`` -together with any kind of ``Router``, trying to do so will make your actor fail verification. - +stealing it from their siblings. .. note:: @@ -390,8 +388,8 @@ together with any kind of ``Router``, trying to do so will make your actor fail that was configured for them in their ``Props``, it is not possible to change an actors dispatcher after it has been created. -The “head” router, of course, cannot run on the same balancing dispatcher, -because it does not process the same messages, hence this special actor does +The “head” router cannot always run on the same dispatcher, because it +does not process the same type of messages, hence this special actor does not use the dispatcher configured in :class:`Props`, but takes the ``routerDispatcher`` from the :class:`RouterConfig` instead, which defaults to the actor system’s default dispatcher. All standard routers allow setting this @@ -400,3 +398,31 @@ implement the method in a suitable way. .. includecode:: code/docs/routing/RouterDocSpec.scala#dispatchers +.. note:: + + It is not allowed to configure the ``routerDispatcher`` to be a + :class:`BalancingDispatcher` since the messages meant for the special + router actor cannot be processed by any other actor. + +At first glance there seems to be an overlap between the +:class:`BalancingDispatcher` and Routers, but they complement each other. +The balancing dispatcher is in charge of running the actors while the routers +are in charge of deciding which message goes where. A router can also have +children that span multiple actor systems, even remote ones, but a dispatcher +lives inside a single actor system. + +When using a :class:`RoundRobinRouter` with a :class:`BalancingDispatcher` +there are some configuration settings to take into account. + +- There can only be ``nr-of-instances`` messages being processed at the same + time no matter how many threads are configured for the + :class:`BalancingDispatcher`. + +- Having ``throughput`` set to a low number makes no sense since you will only + be handing off to another actor that processes the same :class:`MailBox` + as yourself, which can be costly. Either the message just got into the + mailbox and you can receive it as well as anybody else, or everybody else + is busy and you are the only one available to receive the message. + +- Resizing the number of routees only introduce inertia, since resizing + is performed at specified intervals, but work stealing is instantaneous. diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index b8bce31708..f66e120195 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -84,7 +84,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(B, AddressFromURIString("akka://sys"), b.ref) a.send(barrier, EnterBarrier("bar")) noMsg(a, b) - within(1 second) { + within(2 second) { b.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -100,7 +100,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! NodeInfo(C, AddressFromURIString("akka://sys"), c.ref) b.send(barrier, EnterBarrier("bar")) noMsg(a, b, c) - within(1 second) { + within(2 second) { c.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -119,7 +119,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! RemoveClient(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) - b.within(1 second) { + b.within(2 second) { barrier ! RemoveClient(C) b.expectMsg(ToClient(BarrierResult("bar", true))) } @@ -265,7 +265,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with b.expectMsg(ToClient(Done)) a.send(barrier, EnterBarrier("bar")) noMsg(a, b) - within(1 second) { + within(2 second) { b.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -284,7 +284,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with c.expectMsg(ToClient(Done)) b.send(barrier, EnterBarrier("bar")) noMsg(a, b, c) - within(1 second) { + within(2 second) { c.send(barrier, EnterBarrier("bar")) a.expectMsg(ToClient(BarrierResult("bar", true))) b.expectMsg(ToClient(BarrierResult("bar", true))) @@ -306,7 +306,7 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with barrier ! Remove(A) barrier ! ClientDisconnected(A) noMsg(a, b, c) - b.within(1 second) { + b.within(2 second) { barrier ! Remove(C) b.expectMsg(ToClient(BarrierResult("bar", true))) } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 13c90ed61e..dbe9fbae9e 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -10,6 +10,8 @@ import com.typesafe.sbtmultijvm.MultiJvmPlugin import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions } import com.typesafe.sbtscalariform.ScalariformPlugin import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys +import com.typesafe.sbtosgi.OsgiPlugin.osgiSettings +import com.typesafe.sbtosgi.OsgiKeys import java.lang.Boolean.getBoolean import Sphinx.{ sphinxDocs, sphinxHtml, sphinxLatex, sphinxPdf, sphinxPygments, sphinxTags } @@ -44,7 +46,7 @@ object AkkaBuild extends Build { lazy val actor = Project( id = "akka-actor", base = file("akka-actor"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.actor ++ Seq( autoCompilerPlugins := true, libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % v) }, scalacOptions += "-P:continuations:enable", @@ -78,14 +80,14 @@ object AkkaBuild extends Build { id = "akka-remote", base = file("akka-remote"), dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"), - settings = defaultSettings ++ multiJvmSettings ++ Seq( + settings = defaultSettings ++ multiJvmSettings ++ OSGi.remote ++ Seq( libraryDependencies ++= Dependencies.remote, // disable parallel tests parallelExecution in Test := false, extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, - scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"), + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, jvmOptions in MultiJvm := defaultMultiJvmOptions, test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } ) @@ -101,7 +103,7 @@ object AkkaBuild extends Build { extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, - scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"), + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, jvmOptions in MultiJvm := defaultMultiJvmOptions, test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } ) @@ -111,14 +113,14 @@ object AkkaBuild extends Build { id = "akka-cluster", base = file("akka-cluster"), dependencies = Seq(remote, remoteTests % "compile;test->test;multi-jvm->multi-jvm", testkit % "test->test"), - settings = defaultSettings ++ multiJvmSettings ++ Seq( + settings = defaultSettings ++ multiJvmSettings ++ OSGi.cluster ++ Seq( libraryDependencies ++= Dependencies.cluster, // disable parallel tests parallelExecution in Test := false, extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, - scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"), + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, jvmOptions in MultiJvm := defaultMultiJvmOptions, test in Test <<= ((test in Test), (test in MultiJvm)) map { case x => x } ) @@ -128,7 +130,7 @@ object AkkaBuild extends Build { id = "akka-slf4j", base = file("akka-slf4j"), dependencies = Seq(actor, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.slf4j ++ Seq( libraryDependencies ++= Dependencies.slf4j ) ) @@ -137,7 +139,7 @@ object AkkaBuild extends Build { id = "akka-agent", base = file("akka-agent"), dependencies = Seq(actor, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.agent ++ Seq( libraryDependencies ++= Dependencies.agent ) ) @@ -146,7 +148,7 @@ object AkkaBuild extends Build { id = "akka-transactor", base = file("akka-transactor"), dependencies = Seq(actor, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.transactor ++ Seq( libraryDependencies ++= Dependencies.transactor ) ) @@ -164,7 +166,7 @@ object AkkaBuild extends Build { id = "akka-mailboxes-common", base = file("akka-durable-mailboxes/akka-mailboxes-common"), dependencies = Seq(remote, testkit % "compile;test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.mailboxesCommon ++ Seq( libraryDependencies ++= Dependencies.mailboxes, // DurableMailboxSpec published in akka-mailboxes-common-test publishArtifact in Test := true @@ -175,7 +177,7 @@ object AkkaBuild extends Build { id = "akka-file-mailbox", base = file("akka-durable-mailboxes/akka-file-mailbox"), dependencies = Seq(mailboxesCommon % "compile;test->test", testkit % "test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.fileMailbox ++ Seq( libraryDependencies ++= Dependencies.fileMailbox ) ) @@ -184,7 +186,7 @@ object AkkaBuild extends Build { id = "akka-zeromq", base = file("akka-zeromq"), dependencies = Seq(actor, testkit % "test;test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.zeroMQ ++ Seq( libraryDependencies ++= Dependencies.zeroMQ ) ) @@ -202,7 +204,7 @@ object AkkaBuild extends Build { id = "akka-camel", base = file("akka-camel"), dependencies = Seq(actor, slf4j, testkit % "test->test"), - settings = defaultSettings ++ Seq( + settings = defaultSettings ++ OSGi.camel ++ Seq( libraryDependencies ++= Dependencies.camel ) ) @@ -298,7 +300,7 @@ object AkkaBuild extends Build { val defaultExcludedTags = Seq("timing", "long-running") - val defaultMultiJvmOptions: Seq[String] = { + lazy val defaultMultiJvmOptions: Seq[String] = { (System.getProperty("akka.test.timefactor") match { case null => Nil case x => List("-Dakka.test.timefactor=" + x) @@ -306,6 +308,31 @@ object AkkaBuild extends Build { (if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil) } + // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec) + lazy val defaultExcludeTestNames: Seq[String] = { + val exclude = System.getProperty("akka.test.names.exclude", "") + if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq + } + + // for excluding tests by tag (or use system property: -Dakka.test.tags.exclude=timing) + lazy val defaultExcludeTestTags: Seq[String] = { + val exclude = System.getProperty("akka.test.tags.exclude", "") + if (exclude.isEmpty) defaultExcludedTags else exclude.split(",").toSeq + } + + // for including tests by tag (or use system property: -Dakka.test.tags.include=timing) + lazy val defaultIncludeTestTags: Seq[String] = { + val include = System.getProperty("akka.test.tags.include", "") + if (include.isEmpty) Seq.empty else include.split(",").toSeq + } + + lazy val defaultMultiJvmScalatestOptions: Seq[String] = { + val excludeTags = (defaultExcludeTestTags.toSet -- defaultIncludeTestTags.toSet).toSeq + Seq("-r", "org.scalatest.akka.QuietReporter") ++ + (if (excludeTags.isEmpty) Seq.empty else Seq("-l", excludeTags.mkString(" "))) ++ + (if (defaultIncludeTestTags.isEmpty) Seq.empty else Seq("-n", defaultIncludeTestTags.mkString(" "))) + } + lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq( resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/", @@ -318,23 +345,9 @@ object AkkaBuild extends Build { parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean, - // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec) - excludeTestNames := { - val exclude = System.getProperty("akka.test.names.exclude", "") - if (exclude.isEmpty) Seq.empty else exclude.split(",").toSeq - }, - - // for excluding tests by tag (or use system property: -Dakka.test.tags.exclude=timing) - excludeTestTags := { - val exclude = System.getProperty("akka.test.tags.exclude", "") - if (exclude.isEmpty) defaultExcludedTags else exclude.split(",").toSeq - }, - - // for including tests by tag (or use system property: -Dakka.test.tags.include=timing) - includeTestTags := { - val include = System.getProperty("akka.test.tags.include", "") - if (include.isEmpty) Seq.empty else include.split(",").toSeq - }, + excludeTestNames := defaultExcludeTestNames, + excludeTestTags := defaultExcludeTestTags, + includeTestTags := defaultIncludeTestTags, // add filters for tests excluded by name testOptions in Test <++= excludeTestNames map { _.map(exclude => Tests.Filter(test => !test.contains(exclude))) }, @@ -457,3 +470,38 @@ object Dependency { val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 } } + +// OSGi settings + +object OSGi { + + val actor = exports(Seq("akka*")) + + val agent = exports(Seq("akka.agent.*")) + + val camel = exports(Seq("akka.camel.*", "akka.camelexamples")) + + val cluster = exports(Seq("akka.cluster.*")) + + val fileMailbox = exports(Seq("akka.actor.mailbox.*")) + + val mailboxesCommon = exports(Seq("akka.actor.mailbox.*")) + + val remote = exports(Seq("akka.remote.*", "akka.routing.*", "akka.serialization.*")) + + val slf4j = exports(Seq("akka.event.slf4j.*")) + + val transactor = exports(Seq("akka.transactor.*")) + + val zeroMQ = exports(Seq("akka.zeromq.*")) + + def exports(packages: Seq[String]) = osgiSettings ++ Seq( + OsgiKeys.importPackage := Seq("!sun.misc", akkaImport(), configImport(), scalaImport(), "*"), + OsgiKeys.exportPackage := packages + ) + + def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,2.2)\"".format(packageName) + def configImport(packageName: String = "com.typesafe.config.*") = "%s;version=\"[0.4,0.5)\"".format(packageName) + def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.9.2,2.10)\"".format(packageName) + +} diff --git a/project/plugins.sbt b/project/plugins.sbt index ca63fa3dc2..45c8e41913 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,6 +5,8 @@ addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M1") addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.4.0") +addSbtPlugin("com.typesafe.sbtosgi" % "sbtosgi" % "0.2.0") + resolvers ++= Seq( // needed for sbt-assembly, which comes with sbt-multi-jvm Resolver.url("sbtonline", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns),