Merge branch 'master' into wip-2218-test-conductor-barrier-timeouts

This commit is contained in:
Björn Antonsson 2012-06-19 15:11:50 +02:00
commit 4a56f195fc
38 changed files with 1356 additions and 557 deletions

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
public class NonPublicClass {
public static Props createProps() {
return new Props(MyNonPublicActorClass.class);
}
}
class MyNonPublicActorClass extends UntypedActor {
@Override public void onReceive(Object msg) {
getSender().tell(msg);
}
}

View file

@ -358,6 +358,13 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
system.stop(serverRef)
}
"support actorOfs where the class of the actor isn't public" in {
val a = system.actorOf(NonPublicClass.createProps())
a.tell("pigdog", testActor)
expectMsg("pigdog")
system stop a
}
"stop when sent a poison pill" in {
val timeout = Timeout(20000)
val ref = system.actorOf(Props(new Actor {

View file

@ -9,7 +9,6 @@ package akka
* <ul>
* <li>a uuid for tracking purposes</li>
* <li>toString that includes exception name, message and uuid</li>
* <li>toLongString which also includes the stack trace</li>
* </ul>
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed

View file

@ -7,7 +7,6 @@ package akka.actor
import akka.AkkaException
import scala.reflect.BeanProperty
import scala.util.control.NoStackTrace
import scala.collection.immutable.Stack
import java.util.regex.Pattern
/**
@ -279,18 +278,14 @@ trait Actor {
*/
protected[akka] implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
def noContextError =
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw new ActorInitializationException(
"\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." +
"\n\tYou have to use one of the factory methods to create a new actor. Either use:" +
"\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" +
"\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)")
if (contextStack.isEmpty) noContextError
val c = contextStack.head
if (c eq null) noContextError
ActorCell.contextStack.set(contextStack.push(null))
ActorCell.contextStack.set(null :: contextStack)
c
}

View file

@ -13,7 +13,7 @@ import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.event.Logging.LogEventException
import collection.immutable.{ TreeSet, Stack, TreeMap }
import collection.immutable.{ TreeSet, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
//TODO: everything here for current compatibility - could be limited more
@ -173,8 +173,8 @@ trait UntypedActorContext extends ActorContext {
* for! (waves hand)
*/
private[akka] object ActorCell {
val contextStack = new ThreadLocal[Stack[ActorContext]] {
override def initialValue = Stack[ActorContext]()
val contextStack = new ThreadLocal[List[ActorContext]] {
override def initialValue: List[ActorContext] = Nil
}
final val emptyCancellable: Cancellable = new Cancellable {
@ -184,7 +184,7 @@ private[akka] object ActorCell {
final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable)
final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior)
final val emptyBehaviorStack: List[Actor.Receive] = Nil
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
@ -408,7 +408,7 @@ private[akka] class ActorCell(
var currentMessage: Envelope = _
var actor: Actor = _
private var behaviorStack: Stack[Actor.Receive] = Stack.empty
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
@volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
var nextNameSequence: Long = 0
var watching: Set[ActorRef] = emptyActorRefSet
@ -511,25 +511,21 @@ private[akka] class ActorCell(
//This method is in charge of setting up the contextStack and create a new instance of the Actor
protected def newActor(): Actor = {
contextStack.set(contextStack.get.push(this))
contextStack.set(this :: contextStack.get)
try {
import ActorCell.behaviorStackPlaceHolder
behaviorStack = behaviorStackPlaceHolder
behaviorStack = emptyBehaviorStack
val instance = props.creator.apply()
if (instance eq null)
throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")
behaviorStack = behaviorStack match {
case `behaviorStackPlaceHolder` Stack.empty.push(instance.receive)
case newBehaviors Stack.empty.push(instance.receive).pushAll(newBehaviors.reverse.drop(1))
}
// If no becomes were issued, the actors behavior is its receive method
behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
instance
} finally {
val stackAfter = contextStack.get
if (stackAfter.nonEmpty)
contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context
contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
}
}
@ -683,10 +679,8 @@ private[akka] class ActorCell(
}
}
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = {
if (discardOld) unbecome()
behaviorStack = behaviorStack.push(behavior)
}
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)
/**
* UntypedActorContext impl
@ -701,8 +695,9 @@ private[akka] class ActorCell(
def unbecome(): Unit = {
val original = behaviorStack
val popped = original.pop
behaviorStack = if (popped.isEmpty) original else popped
behaviorStack =
if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
else original.tail
}
def autoReceiveMessage(msg: Envelope): Unit = {
@ -761,7 +756,7 @@ private[akka] class ActorCell(
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped"))
} finally {
behaviorStack = behaviorStackPlaceHolder
behaviorStack = emptyBehaviorStack
clearActorFields(a)
actor = null
}

View file

@ -13,7 +13,6 @@ import java.io.Closeable
import akka.dispatch.Await.{ Awaitable, CanAwait }
import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import collection.immutable.Stack
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
@ -430,7 +429,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-]*$"""))
throw new IllegalArgumentException(
"invalid ActorSystem name [" + name +
"], must contain only word characters (i.e. [a-zA-Z_0-9] plus non-leading '-')")
"], must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-')")
import ActorSystem._
@ -685,8 +684,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
final class TerminationCallbacks extends Runnable with Awaitable[Unit] {
private val lock = new ReentrantGuard
private var callbacks: Stack[Runnable] = _ //non-volatile since guarded by the lock
lock withGuard { callbacks = Stack.empty[Runnable] }
private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock
lock withGuard { callbacks = Nil }
private val latch = new CountDownLatch(1)
@ -695,17 +694,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
case 0 throw new RejectedExecutionException("Must be called prior to system shutdown.")
case _ lock withGuard {
if (latch.getCount == 0) throw new RejectedExecutionException("Must be called prior to system shutdown.")
else callbacks = callbacks.push(callback)
else callbacks ::= callback
}
}
}
final def run(): Unit = lock withGuard {
@tailrec def runNext(c: Stack[Runnable]): Stack[Runnable] = c.headOption match {
case None Stack.empty[Runnable]
case Some(callback)
try callback.run() catch { case e log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) }
runNext(c.pop)
@tailrec def runNext(c: List[Runnable]): List[Runnable] = c match {
case Nil Nil
case callback :: rest
try callback.run() catch { case NonFatal(e) log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) }
runNext(rest)
}
try { callbacks = runNext(callbacks) } finally latch.countDown()
}

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.dispatch._
import akka.japi.Creator
import collection.immutable.Stack
import akka.routing._
/**
@ -186,5 +185,10 @@ case class Props(
* able to optimize serialization.
*/
private[akka] case class FromClassCreator(clazz: Class[_ <: Actor]) extends Function0[Actor] {
def apply(): Actor = clazz.newInstance
def apply(): Actor = try clazz.newInstance catch {
case iae: IllegalAccessException
val ctor = clazz.getDeclaredConstructor()
ctor.setAccessible(true)
ctor.newInstance()
}
}

View file

@ -165,8 +165,7 @@ class AccrualFailureDetector(
else PhiFactor * timestampDiff / mean
}
// FIXME change to debug log level, when failure detector is stable
log.info("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
phi
}

View file

@ -6,27 +6,27 @@ package akka.cluster
import akka.actor._
import akka.actor.Status._
import akka.ConfigurationException
import akka.dispatch.Await
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.jsr166y.ThreadLocalRandom
import akka.pattern.ask
import akka.remote._
import akka.routing._
import akka.event.Logging
import akka.dispatch.Await
import akka.pattern.ask
import akka.util._
import akka.util.duration._
import akka.ConfigurationException
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException
import akka.jsr166y.ThreadLocalRandom
import java.lang.management.ManagementFactory
import java.io.Closeable
import javax.management._
import scala.collection.immutable.{ Map, SortedSet }
import scala.annotation.tailrec
import com.google.protobuf.ByteString
import akka.util.internal.HashedWheelTimer
import akka.dispatch.MonitorableThreadFactory
import com.google.protobuf.ByteString
import java.io.Closeable
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit._
import javax.management._
import MemberStatus._
import scala.annotation.tailrec
import scala.collection.immutable.{ Map, SortedSet }
/**
* Interface for membership change listener.
@ -52,7 +52,7 @@ sealed trait ClusterMessage extends Serializable
/**
* Cluster commands sent by the USER.
*/
object ClusterAction {
object ClusterUserAction {
/**
* Command to join the cluster. Sent when a node (reprsesented by 'address')
@ -69,22 +69,33 @@ object ClusterAction {
* Command to mark node as temporary down.
*/
case class Down(address: Address) extends ClusterMessage
}
/**
* Command to remove a node from the cluster immediately.
* Cluster commands sent by the LEADER.
*/
case class Remove(address: Address) extends ClusterMessage
object ClusterLeaderAction {
/**
* INTERNAL API.
*
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
*/
private[akka] case class Exit(address: Address) extends ClusterMessage
private[cluster] case class Exit(address: Address) extends ClusterMessage
/**
* INTERNAL API.
*
* Command to remove a node from the cluster immediately.
*/
private[cluster] case class Remove(address: Address) extends ClusterMessage
}
/**
* Represents the address and the current status of a cluster member node.
*
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`.
*/
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
override def hashCode = address.##
@ -94,12 +105,12 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
}
/**
* Factory and Utility module for Member instances.
* Module with factory and ordering methods for Member instances.
*/
object Member {
/**
* Sort Address by host and port
* `Address` ordering type class, sorts addresses by host and port.
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
@ -107,8 +118,14 @@ object Member {
else false
}
implicit val ordering: Ordering[Member] = new Ordering[Member] {
def compare(x: Member, y: Member) = addressOrdering.compare(x.address, y.address)
/**
* `Member` ordering type class, sorts members by host and port with the exception that
* it puts all members that are in MemberStatus.EXITING last.
*/
implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b)
if (a.status == Exiting && b.status != Exiting) false
else if (a.status != Exiting && b.status == Exiting) true
else addressOrdering.compare(a.address, b.address) < 0
}
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
@ -118,6 +135,15 @@ object Member {
case _ None
}
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(highestPriorityOf)
}
}
/**
* Picks the Member with the highest "priority" MemberStatus.
*/
@ -130,8 +156,8 @@ object Member {
case (_, Exiting) m2
case (Leaving, _) m1
case (_, Leaving) m2
case (Up, Joining) m1
case (Joining, Up) m2
case (Up, Joining) m2
case (Joining, Up) m1
case (Joining, Joining) m1
case (Up, Up) m1
}
@ -148,10 +174,11 @@ case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage {
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED.
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
*/
def isUnavailable: Boolean = this == Down || this == Removed
def isUnavailable: Boolean = this == Down
}
object MemberStatus {
@ -217,6 +244,7 @@ case class Gossip(
// FIXME can be disabled as optimization
assertInvariants
private def assertInvariants: Unit = {
val unreachableAndLive = members.intersect(overview.unreachable)
if (unreachableAndLive.nonEmpty)
@ -242,14 +270,17 @@ case class Gossip(
*/
def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node)
/**
* Adds a member to the member node ring.
*/
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
}
/**
* Marks the gossip as seen by this node (selfAddress) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock for the new gossip.
* Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock (version) for the new gossip.
*/
def seen(address: Address): Gossip = {
if (overview.seen.contains(address) && overview.seen(address) == version) this
@ -268,22 +299,12 @@ case class Gossip(
// 2. merge meta-data
val mergedMeta = this.meta ++ that.meta
def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a ++ b).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(Member.highestPriorityOf)
}
}
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq)
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq).
filterNot(mergedUnreachable.contains)
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
@ -306,11 +327,14 @@ case class Gossip(
case class Heartbeat(from: Address) extends ClusterMessage
/**
* INTERNAL API.
*
* Manages routing of the different cluster commands.
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
*/
private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
import ClusterAction._
private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
import ClusterUserAction._
import ClusterLeaderAction._
val log = Logging(context.system, this)
@ -326,10 +350,12 @@ private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
}
/**
* INTERNAL API.
*
* Pooled and routed with N number of configurable instances.
* Concurrent access to Cluster.
*/
private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
private[cluster] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
def receive = {
@ -341,9 +367,11 @@ private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
}
/**
* INTERNAL API.
*
* Supervisor managing the different Cluster daemons.
*/
private[akka] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)), "commands")
@ -396,13 +424,11 @@ trait ClusterNodeMBean {
def isSingleton: Boolean
def isConvergence: Boolean
def isAvailable: Boolean
def isRunning: Boolean
def join(address: String)
def leave(address: String)
def down(address: String)
def remove(address: String)
def shutdown()
}
/**
@ -455,7 +481,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private val serialization = remote.serialization
private val isRunning = new AtomicBoolean(true)
private val _isRunning = new AtomicBoolean(true)
private val log = Logging(system, "Node")
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
@ -522,22 +548,26 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
// start periodic gossip to random nodes in cluster
private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) {
private val gossipTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
gossip()
}
// start periodic heartbeat to all nodes in cluster
private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) {
private val heartbeatTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
heartbeat()
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
private val failureDetectorReaperTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
reapUnreachableMembers()
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) {
private val leaderActionsTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
leaderActions()
}
@ -562,15 +592,27 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
/**
* Returns true if the cluster node is up and running, false if it is shut down.
*/
def isRunning: Boolean = _isRunning.get
/**
* Latest gossip.
*/
def latestGossip: Gossip = state.get.latestGossip
/**
* Member status for this node.
* Member status for this node (`MemberStatus`).
*
* NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
* and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
* model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
*/
def status: MemberStatus = self.status
def status: MemberStatus = {
if (isRunning) self.status
else MemberStatus.Removed
}
/**
* Is this node the leader?
@ -602,33 +644,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
def isAvailable: Boolean = !isUnavailable(state.get)
/**
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*/
def shutdown(): Unit = {
if (isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
// cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
clusterScheduler.close()
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons)
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {
case e: InstanceNotFoundException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
}
}
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@ -657,7 +672,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
def join(address: Address): Unit = {
val connection = clusterCommandConnectionFor(address)
val command = ClusterAction.Join(selfAddress)
val command = ClusterUserAction.Join(selfAddress)
log.info("Cluster Node [{}] - Trying to send JOIN to [{}] through connection [{}]", selfAddress, address, connection)
connection ! command
}
@ -666,21 +681,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Send command to issue state transition to LEAVING for the node specified by 'address'.
*/
def leave(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Leave(address)
clusterCommandDaemon ! ClusterUserAction.Leave(address)
}
/**
* Send command to issue state transition to from DOWN to EXITING for the node specified by 'address'.
* Send command to DOWN the node specified by 'address'.
*/
def down(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Down(address)
}
/**
* Send command to issue state transition to REMOVED for the node specified by 'address'.
*/
def remove(address: Address): Unit = {
clusterCommandDaemon ! ClusterAction.Remove(address)
clusterCommandDaemon ! ClusterUserAction.Down(address)
}
// ========================================================
@ -688,22 +696,52 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ========================================================
/**
* State transition to JOINING.
* New node joining.
* INTERNAL API.
*
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*
* Should not called by the user. The user can issue a LEAVE command which will tell the node
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
*/
private[cluster] def shutdown(): Unit = {
if (_isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
// cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
clusterScheduler.close()
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons)
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {
case e: InstanceNotFoundException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress)
}
}
/**
* INTERNAL API.
*
* State transition to JOINING - new node joining.
*/
@tailrec
private[cluster] final def joining(node: Address): Unit = {
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val localUnreachable = localGossip.overview.unreachable
val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localUnreachable.exists { m
m.address == node && m.status != Down && m.status != Removed
}
val isUnreachable = localUnreachable.exists { m m.address == node && m.status != Down }
if (!alreadyMember && !isUnreachable) {
@ -721,6 +759,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) failureDetector heartbeat node
notifyMembershipChangeListeners(localState, newState)
@ -729,17 +768,16 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API.
*
* State transition to LEAVING.
*/
@tailrec
private[cluster] final def leaving(address: Address) {
log.info("Cluster Node [{}] - Marking address [{}] as LEAVING", selfAddress, address)
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING
if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
val newMembers = localGossip.members map { member if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
@ -749,32 +787,41 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (!state.compareAndSet(localState, newState)) leaving(address) // recur if we failed update
else {
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
notifyMembershipChangeListeners(localState, newState)
}
}
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
val oldMembersStatus = oldState.latestGossip.members.toSeq.map(m (m.address, m.status))
val newMembersStatus = newState.latestGossip.members.toSeq.map(m (m.address, m.status))
if (newMembersStatus != oldMembersStatus)
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
/**
* INTERNAL API.
*
* State transition to EXITING.
*/
private[cluster] final def exiting(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", selfAddress, address)
log.info("Cluster Node [{}] - Marked node [{}] as EXITING", selfAddress, address)
// FIXME implement when we implement hand-off
}
/**
* INTERNAL API.
*
* State transition to REMOVED.
*
* This method is for now only called after the LEADER have sent a Removed message - telling the node
* to shut down himself.
*
* In the future we might change this to allow the USER to send a Removed(address) message telling an
* arbitrary node to be moved direcly from UP -> REMOVED.
*/
private[cluster] final def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", selfAddress, address)
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
shutdown()
}
/**
* INTERNAL API.
*
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
* and its status is set to DOWN. The node is also removed from the 'seen' table.
*
@ -832,6 +879,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API.
*
* Receive new gossip.
*/
@tailrec
@ -845,9 +894,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip :+ vclockNode
// FIXME change to debug log level, when failure detector is stable
log.info(
"""Can't establish a causal relationship between "remote" gossip [{}] and "local" gossip [{}] - merging them into [{}]""",
log.debug(
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
remoteGossip, localGossip, versionedMergedGossip)
versionedMergedGossip
@ -872,7 +920,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API
* INTERNAL API.
*/
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
@ -882,11 +930,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private def autoJoin(): Unit = nodeToJoin foreach join
/**
* INTERNAL API
* INTERNAL API.
*
* Gossips latest gossip to an address.
*/
private[akka] def gossipTo(address: Address): Unit = {
private[cluster] def gossipTo(address: Address): Unit = {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
connection ! GossipEnvelope(selfAddress, latestGossip)
@ -906,18 +954,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API
* INTERNAL API.
*/
private[akka] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
private[cluster] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
(membersSize + unreachableSize) match {
case 0 0.0
case sum unreachableSize.toDouble / sum
}
/**
* INTERNAL API
* INTERNAL API.
*/
private[akka] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
private[cluster] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
if (nrOfDeputyNodes > membersSize) 1.0
else if (nrOfDeputyNodes == 0) 0.0
else (membersSize + unreachableSize) match {
@ -927,11 +975,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API
* INTERNAL API.
*
* Initates a new round of gossip.
*/
private[akka] def gossip(): Unit = {
private[cluster] def gossip(): Unit = {
val localState = state.get
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
@ -968,9 +1016,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API
* INTERNAL API.
*/
private[akka] def heartbeat(): Unit = {
private[cluster] def heartbeat(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState)) {
@ -985,12 +1033,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API
* INTERNAL API.
*
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
@tailrec
final private[akka] def reapUnreachableMembers(): Unit = {
final private[cluster] def reapUnreachableMembers(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState) && isAvailable(localState)) {
@ -1029,124 +1077,187 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* INTERNAL API
* INTERNAL API.
*
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
*/
@tailrec
final private[akka] def leaderActions(): Unit = {
final private[cluster] def leaderActions(): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
def hasPartionHandoffCompletedSuccessfully(gossip: Gossip): Boolean = {
true
}
if (isLeader && isAvailable(localState)) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
// Leader actions are as follows:
// 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence)
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 2. Move JOINING => UP -- When a node joins the cluster
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Updating the vclock version for the changes
// 6. Updating the 'seen' table
// 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
// 8. Try to update the state with the new gossip
// 9. If failure - retry
// 10. If success - run all the side-effecting processing
var hasChangedState = false
val newGossip =
val (
newGossip: Gossip,
hasChangedState: Boolean,
upMembers,
exitingMembers,
removedMembers,
unreachableButNotDownedMembers) =
if (convergence(localGossip).isDefined) {
// we have convergence - so we can't have unreachable nodes
// transform the node member ring - filterNot/map/map
val newMembers =
localMembers map { member
localMembers filterNot { member
// ----------------------
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
if (member.status == Joining) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address)
hasChangedState = true
member copy (status = Up)
} else member
member.status == MemberStatus.Exiting
} map { member
// ----------------------
// 2. Move EXITING => REMOVED (once all nodes have seen that this node is EXITING e.g. we have a convergence)
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == Exiting) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", selfAddress, member.address)
hasChangedState = true
member copy (status = Removed)
} else member
if (member.status == Joining) member copy (status = Up)
else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully(localGossip)) {
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, member.address)
hasChangedState = true
member copy (status = Exiting)
} else member
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
else member
}
localGossip copy (members = newMembers) // update gossip
// ----------------------
// 5. Store away all stuff needed for the side-effecting processing in 10.
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the 'seen' table
val newSeen = localSeen -- removedMembers.map(_.address)
// removing REMOVED nodes from the 'unreachable' set
val newUnreachableMembers = localUnreachableMembers -- removedMembers
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member])
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers.map { member
// ----------------------
// 4. Move UNREACHABLE => DOWN (auto-downing by leader)
// 5. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.status == Down) member
else {
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
hasChangedState = true
member copy (status = Down)
}
if (member.status == Down) member // no need to DOWN members already DOWN
else member copy (status = Down)
}
// Check for the need to do side-effecting on successful state change
val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down)
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect {
case m if m.status == Down m.address
}
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address }
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
localGossip copy (overview = newOverview) // update gossip
val newGossip = localGossip copy (overview = newOverview) // update gossip
} else localGossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers)
} else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member])
if (hasChangedState) { // we have a change of state - version it and try to update
// ----------------------
// 5. Updating the vclock version for the changes
// 6. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// 6. Updating the 'seen' table
// 7. Updating the 'seen' table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip = versionedGossip seen selfAddress
val seenVersionedGossip =
if (removedMembers.exists(_.address == selfAddress)) versionedGossip
else versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) leaderActions() // recur
else {
// ----------------------
// 8. Try to update the state with the new gossip
// ----------------------
if (!state.compareAndSet(localState, newState)) {
// ----------------------
// 9. Failure - retry
// ----------------------
leaderActions() // recur
} else {
// ----------------------
// 10. Success - run all the side-effecting processing
// ----------------------
// if (removedMembers.exists(_.address == selfAddress)) {
// // we now know that this node (the leader) is just about to shut down since it will be moved from EXITING -> REMOVED
// // so now let's gossip out this information directly since there will not be any other chance
// gossip()
// }
// log the move of members from joining to up
upMembers foreach { member log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) }
// tell all removed members to remove and shut down themselves
removedMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address)
clusterCommandConnectionFor(address) ! ClusterLeaderAction.Remove(address)
}
// tell all exiting members to exit
exitingMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address)
clusterCommandConnectionFor(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff?
}
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
}
notifyMembershipChangeListeners(localState, newState)
}
}
@ -1170,9 +1281,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
// and that all members exists in seen table
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { m
m.status != Down && m.status != Removed
}
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down }
val allMembersInSeen = gossip.members.forall(m seen.contains(m.address))
if (hasUnreachable) {
@ -1201,14 +1310,18 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private def isUnavailable(state: State): Boolean = {
val localGossip = state.latestGossip
val localOverview = localGossip.overview
val localMembers = localGossip.members
val localUnreachableMembers = localOverview.unreachable
val isUnreachable = localUnreachableMembers exists { _.address == selfAddress }
val hasUnavailableMemberStatus = localMembers exists { m (m == self) && m.status.isUnavailable }
val isUnreachable = localGossip.overview.unreachable exists { _.address == selfAddress }
val hasUnavailableMemberStatus = localGossip.members exists { m (m == self) && m.status.isUnavailable }
isUnreachable || hasUnavailableMemberStatus
}
private def notifyMembershipChangeListeners(oldState: State, newState: State): Unit = {
val oldMembersStatus = oldState.latestGossip.members.map(m (m.address, m.status))
val newMembersStatus = newState.latestGossip.members.map(m (m.address, m.status))
if (newMembersStatus != oldMembersStatus)
newState.memberMembershipChangeListeners foreach { _ notify newState.latestGossip.members }
}
/**
* Looks up and returns the local cluster command connection.
*/
@ -1231,9 +1344,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
/**
* INTERNAL API
* INTERNAL API.
*/
private[akka] def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
private[cluster] def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
@ -1276,6 +1389,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
def isAvailable: Boolean = clusterNode.isAvailable
def isRunning: Boolean = clusterNode.isRunning
// JMX commands
def join(address: String) = clusterNode.join(AddressFromURIString(address))
@ -1283,10 +1398,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
def down(address: String) = clusterNode.down(AddressFromURIString(address))
def remove(address: String) = clusterNode.remove(AddressFromURIString(address))
def shutdown() = clusterNode.shutdown()
}
log.info("Cluster Node [{}] - registering cluster JMX MBean [{}]", selfAddress, clusterMBeanName)
try {

View file

@ -39,7 +39,7 @@ abstract class ConvergenceSpec
"A cluster of 3 members" must {
"reach initial convergence" taggedAs LongRunningTest ignore {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
runOn(fourth) {
@ -49,7 +49,7 @@ abstract class ConvergenceSpec
enterBarrier("after-1")
}
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore {
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in {
val thirdAddress = node(third).address
enterBarrier("before-shutdown")
@ -81,7 +81,7 @@ abstract class ConvergenceSpec
enterBarrier("after-2")
}
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore {
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in {
runOn(fourth) {
// try to join
cluster.join(node(first).address)

View file

@ -17,7 +17,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
val c1 = role("c1")
val c2 = role("c2")
commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig))
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
unreachable-nodes-reaper-interval = 30 s
}""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
abstract class LeaderLeavingSpec
extends MultiNodeSpec(LeaderLeavingMultiJvmSpec)
with MultiNodeClusterSpec {
import LeaderLeavingMultiJvmSpec._
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
lazy val thirdAddress = node(third).address
"A LEADER that is LEAVING" must {
"be moved to LEAVING, then to EXITING, then to REMOVED, then be shut down and then a new LEADER should be elected" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
val oldLeaderAddress = cluster.leader
if (cluster.isLeader) {
cluster.leave(oldLeaderAddress)
testConductor.enter("leader-left")
// verify that a NEW LEADER have taken over
awaitCond(!cluster.isLeader)
// verify that the LEADER is shut down
awaitCond(!cluster.isRunning, 30.seconds.dilated)
// verify that the LEADER is REMOVED
awaitCond(cluster.status == MemberStatus.Removed)
} else {
testConductor.enter("leader-left")
// verify that the LEADER is LEAVING
awaitCond(cluster.latestGossip.members.exists(m => m.status == MemberStatus.Leaving && m.address == oldLeaderAddress)) // wait on LEAVING
// verify that the LEADER is EXITING
awaitCond(cluster.latestGossip.members.exists(m => m.status == MemberStatus.Exiting && m.address == oldLeaderAddress)) // wait on EXITING
// verify that the LEADER is no longer part of the 'members' set
awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress))
// verify that the LEADER is not part of the 'unreachable' set
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress))
// verify that we have a new LEADER
awaitCond(cluster.leader != oldLeaderAddress)
}
testConductor.enter("finished")
}
}
}

View file

@ -21,7 +21,7 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task interval
unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))

View file

@ -19,7 +19,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster.leader-actions-interval = 5 s
akka.cluster.unreachable-nodes-reaper-interval = 30 s
akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off"
"""))
.withFallback(MultiNodeClusterSpec.clusterConfig))
}

View file

@ -11,6 +11,9 @@ import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import akka.util.Duration
import org.scalatest.Suite
import org.scalatest.TestFailedException
import scala.util.control.NoStackTrace
object MultiNodeClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString("""
@ -29,10 +32,28 @@ object MultiNodeClusterSpec {
""")
}
trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec
override def initialParticipants = roles.size
// Cluster tests are written so that if previous step (test method) failed
// it will most likely not be possible to run next step. This ensures
// fail fast of steps after the first failure.
private var failed = false
override protected def withFixture(test: NoArgTest): Unit = try {
if (failed) {
val e = new TestFailedException("Previous step failed", 0)
// short stack trace
e.setStackTrace(e.getStackTrace.take(1))
throw e
}
super.withFixture(test)
} catch {
case t
failed = true
throw t
}
/**
* The cluster node instance. Needs to be lazily created.
*/
@ -151,6 +172,6 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
}
def roleName(address: Address): Option[RoleName] = {
testConductor.getNodes.await.find(node(_).address == address)
roles.find(node(_).address == address)
}
}

View file

@ -18,9 +18,9 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
abstract class NodeLeavingAndExitingAndBeingRemovedSpec
extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
@ -36,8 +36,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
"A node that is LEAVING a non-singleton cluster" must {
// FIXME make it work and remove ignore
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest ignore {
"eventually set to REMOVED by the reaper, and removed from membership ring and seen table" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
@ -50,13 +49,14 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
// verify that the 'second' node is no longer part of the 'members' set
awaitCond(cluster.latestGossip.members.forall(_.address != secondAddress), reaperWaitingTime)
// verify that the 'second' node is part of the 'unreachable' set
awaitCond(cluster.latestGossip.overview.unreachable.exists(_.status == MemberStatus.Removed), reaperWaitingTime)
// verify that the 'second' node is not part of the 'unreachable' set
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != secondAddress), reaperWaitingTime)
}
// verify node that got removed is 'second' node
val isRemoved = cluster.latestGossip.overview.unreachable.find(_.status == MemberStatus.Removed)
isRemoved must be('defined)
isRemoved.get.address must be(secondAddress)
runOn(second) {
// verify that the second node is shut down and has status REMOVED
awaitCond(!cluster.isRunning, reaperWaitingTime)
awaitCond(cluster.status == MemberStatus.Removed, reaperWaitingTime)
}
enterBarrier("finished")

View file

@ -20,7 +20,7 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
unreachable-nodes-reaper-interval = 30 s
unreachable-nodes-reaper-interval = 300 s # turn "off"
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
@ -42,8 +42,7 @@ abstract class NodeLeavingAndExitingSpec
"A node that is LEAVING a non-singleton cluster" must {
// FIXME make it work and remove ignore
"be moved to EXITING by the leader" taggedAs LongRunningTest ignore {
"be moved to EXITING by the leader" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)

View file

@ -36,8 +36,7 @@ abstract class NodeLeavingSpec
"A node that is LEAVING a non-singleton cluster" must {
// FIXME make it work and remove ignore
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest ignore {
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)

View file

@ -21,18 +21,19 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.cluster {
gossip-interval = 400 ms
nr-of-deputy-nodes = 0
# FIXME remove this (use default) when ticket #2239 has been fixed
gossip-interval = 400 ms
}
akka.loglevel = INFO
"""))
}
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
abstract class SunnyWeatherSpec
extends MultiNodeSpec(SunnyWeatherMultiJvmSpec)

View file

@ -0,0 +1,435 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.actor.Address
import akka.remote.testconductor.RoleName
import MemberStatus._
object TransitionMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString(
"akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy
abstract class TransitionSpec
extends MultiNodeSpec(TransitionMultiJvmSpec)
with MultiNodeClusterSpec {
import TransitionMultiJvmSpec._
// sorted in the order used by the cluster
def leader(roles: RoleName*) = roles.sorted.head
def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail
def memberStatus(address: Address): MemberStatus = {
val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst {
case m if m.address == address m.status
}
statusOption must not be (None)
statusOption.get
}
def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address)
def members: Set[RoleName] = memberAddresses.flatMap(roleName(_))
def seenLatestGossip: Set[RoleName] = {
val gossip = cluster.latestGossip
gossip.overview.seen.collect {
case (address, v) if v == gossip.version roleName(address)
}.flatten.toSet
}
def awaitSeen(addresses: Address*): Unit = awaitCond {
seenLatestGossip.map(node(_).address) == addresses.toSet
}
def awaitMembers(addresses: Address*): Unit = awaitCond {
memberAddresses == addresses.toSet
}
def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond {
memberStatus(address) == Up
}
// implicit conversion from RoleName to Address
implicit def role2Address(role: RoleName): Address = node(role).address
// DSL sugar for `role1 gossipTo role2`
implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role)
var gossipBarrierCounter = 0
class RoleWrapper(fromRole: RoleName) {
def gossipTo(toRole: RoleName): Unit = {
gossipBarrierCounter += 1
runOn(toRole) {
val g = cluster.latestGossip
testConductor.enter("before-gossip-" + gossipBarrierCounter)
awaitCond(cluster.latestGossip != g) // received gossip
testConductor.enter("after-gossip-" + gossipBarrierCounter)
}
runOn(fromRole) {
testConductor.enter("before-gossip-" + gossipBarrierCounter)
cluster.gossipTo(node(toRole).address) // send gossip
testConductor.enter("after-gossip-" + gossipBarrierCounter)
}
runOn(roles.filterNot(r r == fromRole || r == toRole): _*) {
testConductor.enter("before-gossip-" + gossipBarrierCounter)
testConductor.enter("after-gossip-" + gossipBarrierCounter)
}
}
}
"A Cluster" must {
"start nodes as singleton clusters" taggedAs LongRunningTest in {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
cluster.leaderActions()
cluster.status must be(Up)
testConductor.enter("after-1")
}
"perform correct transitions when second joining first" taggedAs LongRunningTest in {
runOn(second) {
cluster.join(first)
}
runOn(first) {
awaitMembers(first, second)
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
cluster.convergence.isDefined must be(false)
}
testConductor.enter("second-joined")
first gossipTo second
runOn(second) {
members must be(Set(first, second))
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
// we got a conflicting version in second, and therefore not convergence in second
seenLatestGossip must be(Set(second))
cluster.convergence.isDefined must be(false)
}
second gossipTo first
runOn(first) {
seenLatestGossip must be(Set(first, second))
}
first gossipTo second
runOn(second) {
seenLatestGossip must be(Set(first, second))
}
runOn(first, second) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
cluster.convergence.isDefined must be(true)
}
testConductor.enter("convergence-joining-2")
runOn(leader(first, second)) {
cluster.leaderActions()
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
}
testConductor.enter("leader-actions-2")
leader(first, second) gossipTo nonLeader(first, second).head
runOn(nonLeader(first, second).head) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(true)
}
nonLeader(first, second).head gossipTo leader(first, second)
runOn(first, second) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(true)
}
testConductor.enter("after-2")
}
"perform correct transitions when third joins second" taggedAs LongRunningTest in {
runOn(third) {
cluster.join(second)
}
runOn(second) {
awaitMembers(first, second, third)
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(second))
}
testConductor.enter("third-joined-second")
second gossipTo first
runOn(first) {
members must be(Set(first, second, third))
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
}
first gossipTo third
runOn(third) {
members must be(Set(first, second, third))
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
// conflicting version
seenLatestGossip must be(Set(third))
}
third gossipTo first
third gossipTo second
runOn(first, second) {
seenLatestGossip must be(Set(myself, third))
}
first gossipTo second
runOn(second) {
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
runOn(first, third) {
cluster.convergence.isDefined must be(false)
}
second gossipTo first
second gossipTo third
runOn(first, second, third) {
seenLatestGossip must be(Set(first, second, third))
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Joining)
cluster.convergence.isDefined must be(true)
}
testConductor.enter("convergence-joining-3")
runOn(leader(first, second, third)) {
cluster.leaderActions()
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
}
testConductor.enter("leader-actions-3")
// leader gossipTo first non-leader
leader(first, second, third) gossipTo nonLeader(first, second, third).head
runOn(nonLeader(first, second, third).head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(leader(first, second, third), myself))
cluster.convergence.isDefined must be(false)
}
// first non-leader gossipTo the other non-leader
nonLeader(first, second, third).head gossipTo nonLeader(first, second, third).tail.head
runOn(nonLeader(first, second, third).head) {
cluster.gossipTo(node(nonLeader(first, second, third).tail.head).address)
}
runOn(nonLeader(first, second, third).tail.head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
// and back again
nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head
runOn(nonLeader(first, second, third).head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
// first non-leader gossipTo the leader
nonLeader(first, second, third).head gossipTo leader(first, second, third)
runOn(first, second, third) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
testConductor.enter("after-3")
}
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
runOn(fourth) {
cluster.join(fifth)
awaitMembers(fourth, fifth)
cluster.gossipTo(fifth)
awaitSeen(fourth, fifth)
cluster.convergence.isDefined must be(true)
}
runOn(fifth) {
awaitMembers(fourth, fifth)
cluster.gossipTo(fourth)
awaitSeen(fourth, fifth)
cluster.gossipTo(fourth)
cluster.convergence.isDefined must be(true)
}
testConductor.enter("fourth-joined-fifth")
testConductor.enter("after-4")
}
"perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in {
runOn(fourth) {
cluster.join(third)
}
runOn(third) {
awaitMembers(first, second, third, fourth)
seenLatestGossip must be(Set(third))
}
testConductor.enter("fourth-joined-third")
third gossipTo second
runOn(second) {
seenLatestGossip must be(Set(second, third))
}
second gossipTo fourth
runOn(fourth) {
members must be(roles.toSet)
// merge conflict
seenLatestGossip must be(Set(fourth))
}
fourth gossipTo first
fourth gossipTo second
fourth gossipTo third
fourth gossipTo fifth
runOn(first, second, third, fifth) {
members must be(roles.toSet)
seenLatestGossip must be(Set(fourth, myself))
}
first gossipTo fifth
runOn(fifth) {
seenLatestGossip must be(Set(first, fourth, fifth))
}
fifth gossipTo third
runOn(third) {
seenLatestGossip must be(Set(first, third, fourth, fifth))
}
third gossipTo second
runOn(second) {
seenLatestGossip must be(roles.toSet)
cluster.convergence.isDefined must be(true)
}
second gossipTo first
second gossipTo third
second gossipTo fourth
third gossipTo fifth
seenLatestGossip must be(roles.toSet)
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
memberStatus(fourth) must be(Joining)
memberStatus(fifth) must be(Up)
cluster.convergence.isDefined must be(true)
testConductor.enter("convergence-joining-3")
runOn(leader(roles: _*)) {
cluster.leaderActions()
memberStatus(fourth) must be(Up)
seenLatestGossip must be(Set(myself))
cluster.convergence.isDefined must be(false)
}
// spread the word
for (x :: y :: Nil (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) {
x gossipTo y
}
testConductor.enter("spread-5")
seenLatestGossip must be(roles.toSet)
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
memberStatus(fourth) must be(Up)
memberStatus(fifth) must be(Up)
cluster.convergence.isDefined must be(true)
testConductor.enter("after-5")
}
"perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in {
runOn(fifth) {
markNodeAsUnavailable(second)
cluster.reapUnreachableMembers()
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
seenLatestGossip must be(Set(fifth))
}
// spread the word
val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth)
for (x :: y :: Nil gossipRound.sliding(2)) {
x gossipTo y
}
runOn((roles.filterNot(_ == second)): _*) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
cluster.convergence.isDefined must be(false)
}
runOn(third) {
cluster.down(second)
awaitMemberStatus(second, Down)
}
// spread the word
val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth)
for (x :: y :: Nil gossipRound2.sliding(2)) {
x gossipTo y
}
runOn((roles.filterNot(_ == second)): _*) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Down))
memberStatus(second) must be(Down)
seenLatestGossip must be(Set(first, third, fourth, fifth))
cluster.convergence.isDefined must be(true)
}
testConductor.enter("after-6")
}
}
}

View file

@ -33,12 +33,12 @@ class GossipSpec extends WordSpec with MustMatchers {
val g2 = Gossip(members = SortedSet(a2, c2, e2))
val merged1 = g1 merge g2
merged1.members must be(SortedSet(a1, c1, e2))
merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
merged1.members must be(SortedSet(a2, c1, e1))
merged1.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
val merged2 = g2 merge g1
merged2.members must be(SortedSet(a1, c1, e2))
merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
merged2.members must be(SortedSet(a2, c1, e1))
merged2.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
}
@ -48,12 +48,12 @@ class GossipSpec extends WordSpec with MustMatchers {
val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2)))
val merged1 = g1 merge g2
merged1.overview.unreachable must be(Set(a1, b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
merged1.overview.unreachable must be(Set(a2, b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
val merged2 = g2 merge g1
merged2.overview.unreachable must be(Set(a1, b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
merged2.overview.unreachable must be(Set(a2, b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
}
@ -62,14 +62,14 @@ class GossipSpec extends WordSpec with MustMatchers {
val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2)))
val merged1 = g1 merge g2
merged1.members must be(SortedSet(a1))
merged1.members.toSeq.map(_.status) must be(Seq(Up))
merged1.members must be(SortedSet(a2))
merged1.members.toSeq.map(_.status) must be(Seq(Joining))
merged1.overview.unreachable must be(Set(b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
val merged2 = g2 merge g1
merged2.members must be(SortedSet(a1))
merged2.members.toSeq.map(_.status) must be(Seq(Up))
merged2.members must be(SortedSet(a2))
merged2.members.toSeq.map(_.status) must be(Seq(Joining))
merged2.overview.unreachable must be(Set(b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))

View file

@ -0,0 +1,138 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.{ Address, AddressFromURIString }
import java.net.InetSocketAddress
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import scala.collection.immutable.SortedSet
import scala.util.Random
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MemberOrderingSpec extends WordSpec with MustMatchers {
import Member.ordering
import Member.addressOrdering
import MemberStatus._
"An Ordering[Member]" must {
"order non-exiting members by host:port" in {
val members = SortedSet.empty[Member] +
Member(AddressFromURIString("akka://sys@darkstar:1112"), Up) +
Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining) +
Member(AddressFromURIString("akka://sys@darkstar:1111"), Up)
val seq = members.toSeq
seq.size must equal(3)
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Up))
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining))
}
"order exiting members by last" in {
val members = SortedSet.empty[Member] +
Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
Member(AddressFromURIString("akka://sys@darkstar:1113"), Up) +
Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining)
val seq = members.toSeq
seq.size must equal(3)
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining))
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Up))
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
}
"order multiple exiting members by last but internally by host:port" in {
val members = SortedSet.empty[Member] +
Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving) +
Member(AddressFromURIString("akka://sys@darkstar:1111"), Up) +
Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting)
val seq = members.toSeq
seq.size must equal(4)
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving))
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting))
seq(3) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
}
"be sorted by address correctly" in {
import Member.ordering
// sorting should be done on host and port, only
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
val expected = IndexedSeq(m1, m2, m3, m4, m5)
val shuffled = Random.shuffle(expected)
shuffled.sorted must be(expected)
(SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected)
}
"have stable equals and hashCode" in {
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining)
val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
m1 must be(m2)
m1.hashCode must be(m2.hashCode)
m3 must not be (m2)
m3 must not be (m1)
}
}
"An Ordering[Address]" must {
"order addresses by port" in {
val addresses = SortedSet.empty[Address] +
AddressFromURIString("akka://sys@darkstar:1112") +
AddressFromURIString("akka://sys@darkstar:1113") +
AddressFromURIString("akka://sys@darkstar:1110") +
AddressFromURIString("akka://sys@darkstar:1111")
val seq = addresses.toSeq
seq.size must equal(4)
seq(0) must equal(AddressFromURIString("akka://sys@darkstar:1110"))
seq(1) must equal(AddressFromURIString("akka://sys@darkstar:1111"))
seq(2) must equal(AddressFromURIString("akka://sys@darkstar:1112"))
seq(3) must equal(AddressFromURIString("akka://sys@darkstar:1113"))
}
"order addresses by hostname" in {
val addresses = SortedSet.empty[Address] +
AddressFromURIString("akka://sys@darkstar2:1110") +
AddressFromURIString("akka://sys@darkstar1:1110") +
AddressFromURIString("akka://sys@darkstar3:1110") +
AddressFromURIString("akka://sys@darkstar0:1110")
val seq = addresses.toSeq
seq.size must equal(4)
seq(0) must equal(AddressFromURIString("akka://sys@darkstar0:1110"))
seq(1) must equal(AddressFromURIString("akka://sys@darkstar1:1110"))
seq(2) must equal(AddressFromURIString("akka://sys@darkstar2:1110"))
seq(3) must equal(AddressFromURIString("akka://sys@darkstar3:1110"))
}
"order addresses by hostname and port" in {
val addresses = SortedSet.empty[Address] +
AddressFromURIString("akka://sys@darkstar2:1110") +
AddressFromURIString("akka://sys@darkstar0:1111") +
AddressFromURIString("akka://sys@darkstar2:1111") +
AddressFromURIString("akka://sys@darkstar0:1110")
val seq = addresses.toSeq
seq.size must equal(4)
seq(0) must equal(AddressFromURIString("akka://sys@darkstar0:1110"))
seq(1) must equal(AddressFromURIString("akka://sys@darkstar0:1111"))
seq(2) must equal(AddressFromURIString("akka://sys@darkstar2:1110"))
seq(3) must equal(AddressFromURIString("akka://sys@darkstar2:1111"))
}
}
}

View file

@ -1,45 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import scala.util.Random
import scala.collection.immutable.SortedSet
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MemberSpec extends WordSpec with MustMatchers {
"Member" must {
"be sorted by address correctly" in {
import Member.ordering
// sorting should be done on host and port, only
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up)
val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up)
val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up)
val expected = IndexedSeq(m1, m2, m3, m4, m5)
val shuffled = Random.shuffle(expected)
shuffled.sorted must be(expected)
(SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected)
}
"have stable equals and hashCode" in {
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining)
val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
m1 must be(m2)
m1.hashCode must be(m2.hashCode)
m3 must not be (m2)
m3 must not be (m1)
}
}
}

View file

@ -5,8 +5,7 @@
Cluster Specification
######################
.. note:: *This document describes the new clustering coming in Akka Coltrane and
is not available in the latest stable release)*
.. note:: *This document describes the new clustering coming in Akka Coltrane and is not available in the latest stable release)*
Intro
=====
@ -164,8 +163,8 @@ After gossip convergence a ``leader`` for the cluster can be determined. There i
``leader`` election process, the ``leader`` can always be recognised deterministically
by any node whenever there is gossip convergence. The ``leader`` is simply the first
node in sorted order that is able to take the leadership role, where the only
allowed member states for a ``leader`` are ``up`` or ``leaving`` (see below for more
information about member states).
allowed member states for a ``leader`` are ``up``, ``leaving`` or ``exiting`` (see
below for more information about member states).
The role of the ``leader`` is to shift members in and out of the cluster, changing
``joining`` members to the ``up`` state or ``exiting`` members to the
@ -302,10 +301,6 @@ handoff has completed then the node will change to the ``exiting`` state. Once
all nodes have seen the exiting state (convergence) the ``leader`` will remove the
node from the cluster, marking it as ``removed``.
A node can also be removed forcefully by moving it directly to the ``removed``
state using the ``remove`` action. The cluster will rebalance based on the new
cluster membership.
If a node is unreachable then gossip convergence is not possible and therefore
any ``leader`` actions are also not possible (for instance, allowing a node to
become a part of the cluster, or changing actor distribution). To be able to
@ -314,11 +309,12 @@ unreachable node is experiencing only transient difficulties then it can be
explicitly marked as ``down`` using the ``down`` user action. When this node
comes back up and begins gossiping it will automatically go through the joining
process again. If the unreachable node will be permanently down then it can be
removed from the cluster directly with the ``remove`` user action. The cluster
can also *auto-down* a node using the accrual failure detector.
removed from the cluster directly by shutting the actor system down or killing it
through an external ``SIGKILL`` signal, invocation of ``System.exit(status)`` or
similar. The cluster can, through the leader, also *auto-down* a node.
This means that nodes can join and leave the cluster at any point in time,
e.g. provide cluster elasticity.
This means that nodes can join and leave the cluster at any point in time, i.e.
provide cluster elasticity.
State Diagram for the Member States
@ -339,12 +335,12 @@ Member States
- **leaving** / **exiting**
states during graceful removal
- **removed**
tombstone state (no longer a member)
- **down**
marked as down/offline/unreachable
- **removed**
tombstone state (no longer a member)
User Actions
^^^^^^^^^^^^
@ -359,9 +355,6 @@ User Actions
- **down**
mark a node as temporarily down
- **remove**
remove a node from the cluster immediately
Leader Actions
^^^^^^^^^^^^^^

View file

@ -55,7 +55,7 @@ which is a user-level concern.
Ordering is preserved on a per-sender basis
-------------------------------------------
Actor ``A1` sends messages ``M1``, ``M2``, ``M3`` to ``A2``
Actor ``A1`` sends messages ``M1``, ``M2``, ``M3`` to ``A2``
Actor ``A3`` sends messages ``M4``, ``M5``, ``M6`` to ``A2``
This means that:

View file

@ -63,20 +63,6 @@ case "$2" in
$JMX_CLIENT $HOST akka:type=Cluster leave=$ACTOR_SYSTEM_URL
;;
remove)
if [ $# -ne 3 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> remove <actor-system-url-to-join>"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
ACTOR_SYSTEM_URL=$2
echo "Scheduling $ACTOR_SYSTEM_URL to REMOVE"
$JMX_CLIENT $HOST akka:type=Cluster remove=$ACTOR_SYSTEM_URL
;;
down)
if [ $# -ne 3 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> down <actor-system-url-to-join>"
@ -169,19 +155,32 @@ case "$2" in
$JMX_CLIENT $HOST akka:type=Cluster Available
;;
is-running)
if [ $# -ne 2 ]; then
echo "Usage: $SELF <node-hostname:jmx-port> is-running"
exit 1
fi
ensureNodeIsRunningAndAvailable
shift
echo "Checking if member node on $HOST is AVAILABLE"
$JMX_CLIENT $HOST akka:type=Cluster Running
;;
*)
printf "Usage: bin/$SELF <node-hostname:jmx-port> <command> ...\n"
printf "\n"
printf "Supported commands are:\n"
printf "%26s - %s\n" "join <actor-system-url>" "Sends request a JOIN node with the specified URL"
printf "%26s - %s\n" "leave <actor-system-url>" "Sends a request for node with URL to LEAVE the cluster"
printf "%26s - %s\n" "remove <actor-system-url>" "Sends a request for node with URL to be instantly REMOVED from the cluster"
printf "%26s - %s\n" "down <actor-system-url>" "Sends a request for marking node with URL as DOWN"
printf "%26s - %s\n" member-status "Asks the member node for its current status"
printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)"
printf "%26s - %s\n" leader "Asks the cluster who the current leader is"
printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)"
printf "%26s - %s\n" is-available "Checks if the member node is available"
printf "%26s - %s\n" is-running "Checks if the member node is running"
printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence"
printf "Where the <actor-system-url> should be on the format of 'akka://actor-system-name@hostname:port'\n"
printf "\n"

View file

@ -190,9 +190,10 @@ akka {
# 'TLSv1.1', 'TLSv1.2'
protocol = "TLSv1"
# Examples: [ "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA" ]
# You need to install the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256
# More info here: http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider
supported-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"]
enabled-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"]
# Using /dev/./urandom is only necessary when using SHA1PRNG on Linux to prevent blocking
# It is NOT as secure because it reuses the seed
@ -207,6 +208,7 @@ akka {
# The following use one of 3 possible seed sources, depending on availability: /dev/random, random.org and SecureRandom (provided by Java)
# "AES128CounterRNGSecure"
# "AES256CounterRNGSecure" (Install JCE Unlimited Strength Jurisdiction Policy Files first)
# Setting a value here may require you to supply the appropriate cipher suite (see enabled-algorithms section above)
random-number-generator = ""
}
}

View file

@ -17,13 +17,17 @@ import akka.security.provider.AkkaProvider
* Internal use only
*/
private[akka] object NettySSLSupport {
val akka = new AkkaProvider
Security.addProvider(akka)
/**
* Construct a SSLHandler which can be inserted into a Netty server/client pipeline
*/
def apply(settings: NettySettings, log: LoggingAdapter, isClient: Boolean): SslHandler =
if (isClient) initialiseClientSSL(settings, log) else initialiseServerSSL(settings, log)
if (isClient) initializeClientSSL(settings, log) else initializeServerSSL(settings, log)
def initialiseCustomSecureRandom(rngName: Option[String], sourceOfRandomness: Option[String], log: LoggingAdapter): SecureRandom = {
def initializeCustomSecureRandom(rngName: Option[String], sourceOfRandomness: Option[String], log: LoggingAdapter): SecureRandom = {
/**
* According to this bug report: http://bugs.sun.com/view_bug.do?bug_id=6202721
* Using /dev/./urandom is only necessary when using SHA1PRNG on Linux
@ -34,8 +38,6 @@ private[akka] object NettySSLSupport {
val rng = rngName match {
case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure"))
log.debug("SSL random number generator set to: {}", r)
val akka = new AkkaProvider
Security.addProvider(akka)
SecureRandom.getInstance(r, akka)
case Some("SHA1PRNG")
log.debug("SSL random number generator set to: SHA1PRNG")
@ -53,8 +55,23 @@ private[akka] object NettySSLSupport {
rng
}
private def initialiseClientSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
def initializeClientSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
log.debug("Client SSL is enabled, initialising ...")
def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] =
try {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
trustManagerFactory.init(trustStore)
val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(null, trustManagers, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Client SSL connection could not be established because SSL context could not be constructed", e)
}
((settings.SSLTrustStore, settings.SSLTrustStorePassword, settings.SSLProtocol) match {
case (Some(trustStore), Some(password), Some(protocol)) constructClientContext(settings, log, trustStore, password, protocol)
case (trustStore, password, protocol) throw new GeneralSecurityException(
@ -67,11 +84,11 @@ private[akka] object NettySSLSupport {
log.debug("Using client SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(true)
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString))
new SslHandler(sslEngine)
case None
throw new GeneralSecurityException(
"""Failed to initialise client SSL because SSL context could not be found." +
"""Failed to initialize client SSL because SSL context could not be found." +
"Make sure your settings are correct: [trust-store: %s] [trust-store-password: %s] [protocol: %s]""".format(
settings.SSLTrustStore,
settings.SSLTrustStorePassword,
@ -79,24 +96,22 @@ private[akka] object NettySSLSupport {
}
}
private def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = {
try {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
trustManagerFactory.init(trustStore)
val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(null, trustManagers, initialiseCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Client SSL connection could not be established because SSL context could not be constructed", e)
}
}
private def initialiseServerSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
def initializeServerSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
log.debug("Server SSL is enabled, initialising ...")
def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] =
try {
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
factory.init(keyStore, keyStorePassword.toCharArray)
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(factory.getKeyManagers, null, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
}
((settings.SSLKeyStore, settings.SSLKeyStorePassword, settings.SSLProtocol) match {
case (Some(keyStore), Some(password), Some(protocol)) constructServerContext(settings, log, keyStore, password, protocol)
case (keyStore, password, protocol) throw new GeneralSecurityException(
@ -106,28 +121,14 @@ private[akka] object NettySSLSupport {
log.debug("Using server SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(false)
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString))
new SslHandler(sslEngine)
case None throw new GeneralSecurityException(
"""Failed to initialise server SSL because SSL context could not be found.
"""Failed to initialize server SSL because SSL context could not be found.
Make sure your settings are correct: [key-store: %s] [key-store-password: %s] [protocol: %s]""".format(
settings.SSLKeyStore,
settings.SSLKeyStorePassword,
settings.SSLProtocol))
}
}
private def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = {
try {
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
factory.init(keyStore, keyStorePassword.toCharArray)
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(factory.getKeyManagers, null, initialiseCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
}
}
}

View file

@ -106,7 +106,7 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
case password Some(password)
}
val SSLSupportedAlgorithms = getStringList("ssl.supported-algorithms")
val SSLEnabledAlgorithms = getStringList("ssl.enabled-algorithms").toArray.toSet
val SSLProtocol = getString("ssl.protocol") match {
case "" None

View file

@ -9,27 +9,36 @@ import com.typesafe.config._
import akka.dispatch.{ Await, Future }
import akka.pattern.ask
import java.io.File
import java.security.{ SecureRandom, PrivilegedAction, AccessController }
import netty.NettySSLSupport
import akka.event.{ NoLogging, LoggingAdapter }
import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController }
import netty.{ NettySettings, NettySSLSupport }
import javax.net.ssl.SSLException
import akka.util.{ Timeout, Duration }
import akka.util.duration._
object Configuration {
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
// The certificate will expire in 2109
private val trustStore = getPath("truststore")
private val keyStore = getPath("keystore")
private def getPath(name: String): String = (new File("akka-remote/src/test/resources/" + name)).getAbsolutePath.replace("\\", "\\\\")
private val trustStore = getClass.getClassLoader.getResource("truststore").getPath
private val keyStore = getClass.getClassLoader.getResource("keystore").getPath
private val conf = """
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
test {
single-expect-default = 10s
filter-leeway = 10s
default-timeout = 10s
}
remote.netty {
hostname = localhost
port = 12345
ssl {
enable = on
trust-store = "%s"
key-store = "%s"
random-number-generator = "%s"
enabled-algorithms = [%s]
sha1prng-random-source = "/dev/./urandom"
}
}
actor.deployment {
@ -40,70 +49,91 @@ object Configuration {
}
"""
def getCipherConfig(cipher: String): (String, Boolean, Config) = if (try {
NettySSLSupport.initialiseCustomSecureRandom(Some(cipher), None, NoLogging) ne null
def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = try {
if (true) throw new IllegalArgumentException("Test not enabled")
val config = ConfigFactory.parseString("akka.remote.netty.port=12345").withFallback(ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", "))))
val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty")
val settings = new NettySettings(fullConfig, "placeholder")
val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging)
rng.nextInt() // Has to work
settings.SSLRandomNumberGenerator foreach { sRng rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) }
val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine
val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet
val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet
gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported))
gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled))
engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get))
(cipher, true, config)
} catch {
case iae: IllegalArgumentException if iae.getMessage == "Cannot support %s with currently installed providers".format(cipher) false
}) (cipher, true, ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher))) else (cipher, false, AkkaSpec.testConf)
case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) (cipher, false, AkkaSpec.testConf) // Cannot match against the message since the message might be localized :S
}
}
import Configuration.getCipherConfig
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG"))
class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG", "TLS_RSA_WITH_AES_128_CBC_SHA"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES128CounterRNGFastSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGFast"))
class Ticket1978AES128CounterRNGFastSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGFast", "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
/**
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
*/
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES128CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGSecure"))
class Ticket1978AES128CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGSecure", "TLS_RSA_WITH_AES_128_CBC_SHA"))
/**
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
*/
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES256CounterRNGSecure"))
class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES256CounterRNGSecure", "TLS_RSA_WITH_AES_256_CBC_SHA"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978DefaultRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "TLS_RSA_WITH_AES_128_CBC_SHA"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender with DefaultTimeout {
abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender {
implicit val timeout: Timeout = Timeout(5 seconds)
import RemoteCommunicationSpec._
val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config)
val other = ActorSystem("remote-sys", conf)
val remote = other.actorOf(Props(new Actor {
def receive = {
case "ping" sender ! (("pong", sender))
}
}), "echo")
val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo")
val other = ActorSystem("remote-sys", ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config))
override def atTermination() {
other.shutdown()
other.awaitTermination()
}
"SSL Remoting" must {
if (cipherEnabledconfig._2) {
val remote = other.actorOf(Props(new Actor { def receive = { case "ping" sender ! (("pong", sender)) } }), "echo")
val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo")
"support remote look-ups" in {
here ! "ping"
expectMsgPF() {
expectMsgPF(timeout.duration) {
case ("pong", s: AnyRef) if s eq testActor true
}
}
"send error message for wrong address" in {
within(timeout.duration) {
EventFilter.error(start = "dropping", occurrences = 1).intercept {
system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping"
}(other)
}
}
"support ask" in {
Await.result(here ? "ping", timeout.duration) match {
@ -113,12 +143,15 @@ abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boo
}
"send dead letters on remote if actor does not exist" in {
within(timeout.duration) {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh"
}(other)
}
}
"create and supervise children on remote node" in {
within(timeout.duration) {
val r = system.actorOf(Props[Echo], "blub")
r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub"
r ! 42
@ -129,11 +162,11 @@ abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boo
expectMsg("preRestart")
r ! 42
expectMsg(42)
system.stop(r)
expectMsg("postStop")
}
}
"look-up actors across node boundaries" in {
within(timeout.duration) {
val l = system.actorOf(Props(new Actor {
def receive = {
case (p: Props, n: String) sender ! context.actorOf(p, n)
@ -155,10 +188,11 @@ abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boo
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
}
}
"not fail ask across node boundaries" in {
val f = for (_ 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)]
Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong"))
Await.result(Future.sequence(f), timeout.duration).map(_._1).toSet must be(Set("pong"))
}
} else {
"not be run when the cipher is not supported by the platform this test is currently being executed on" ignore {

View file

@ -40,7 +40,7 @@ akka {
SSLTrustStore must be(Some("truststore"))
SSLTrustStorePassword must be(Some("changeme"))
SSLProtocol must be(Some("TLSv1"))
SSLSupportedAlgorithms must be(java.util.Arrays.asList("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
SSLEnabledAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA"))
SSLRandomSource must be(None)
SSLRandomNumberGenerator must be(None)
}

View file

@ -75,7 +75,9 @@ object AkkaKernelPlugin extends Plugin {
copyFiles(libFiles(cp, conf.libFilter), distLibPath)
copyFiles(conf.additionalLibs, distLibPath)
for (subTarget subProjectDependencies.map(_.target)) {
for (subProjectDependency subProjectDependencies) {
val subTarget = subProjectDependency.target
EvaluateTask(buildStruct, packageBin in Compile, st, subProjectDependency.projectRef)
copyJars(subTarget, distLibPath)
}
log.info("Distribution created.")
@ -220,10 +222,10 @@ object AkkaKernelPlugin extends Plugin {
}.toList
val target = setting(Keys.crossTarget, "Missing crossTarget directory")
SubProjectInfo(project.id, target, subProjects)
SubProjectInfo(projectRef, target, subProjects)
}
private case class SubProjectInfo(id: String, target: File, subProjects: Seq[SubProjectInfo]) {
private case class SubProjectInfo(projectRef: ProjectRef, target: File, subProjects: Seq[SubProjectInfo]) {
def recursiveSubProjects: Set[SubProjectInfo] = {
val flatSubProjects = for {

View file

@ -5,9 +5,7 @@
package akka.testkit
import akka.actor._
import akka.util.Duration
import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.Stack
import akka.dispatch._
import akka.pattern.ask

View file

@ -492,7 +492,8 @@ trait TestKitBase {
@tailrec
def doit(acc: List[T], count: Int): List[T] = {
if (count >= messages) return acc.reverse
if (count >= messages) acc.reverse
else {
receiveOne((stop - now) min idle)
lastMessage match {
case NullMessage
@ -507,6 +508,7 @@ trait TestKitBase {
acc.reverse
}
}
}
val ret = doit(Nil, 0)
lastWasNoMsg = true

View file

@ -57,7 +57,7 @@ public class UntypedCoordinatedIncrementTest {
Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS);
@Before
public void initialise() {
public void initialize() {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;

View file

@ -58,7 +58,7 @@ public class UntypedTransactorTest {
Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS);
@Before
public void initialise() {
public void initialize() {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;

View file

@ -9,14 +9,17 @@ import akka.actor._
import akka.dispatch.{ Promise, Future }
import akka.event.Logging
import annotation.tailrec
import akka.util.Duration
import java.util.concurrent.TimeUnit
import collection.mutable.ListBuffer
import akka.util.{ NonFatal, Duration }
private[zeromq] object ConcurrentSocketActor {
private sealed trait PollMsg
private case object Poll extends PollMsg
private case object PollCareful extends PollMsg
private case object Flush
private class NoSocketHandleException() extends Exception("Couldn't create a zeromq socket.")
private val DefaultContext = Context()
@ -32,19 +35,28 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
import SocketType.{ ZMQSocketType ST }
params.collectFirst { case t: ST t }.getOrElse(throw new IllegalArgumentException("A socket type is required"))
}
private val socket: Socket = zmqContext.socket(socketType)
private val poller: Poller = zmqContext.poller
private val log = Logging(context.system, this)
private val pendingSends = new ListBuffer[Seq[Frame]]
def receive = {
case m: PollMsg doPoll(m)
case ZMQMessage(frames) sendMessage(frames)
case ZMQMessage(frames) handleRequest(Send(frames))
case r: Request handleRequest(r)
case Flush flush()
case Terminated(_) context stop self
}
private def handleRequest(msg: Request): Unit = msg match {
case Send(frames) sendMessage(frames)
case Send(frames)
if (frames.nonEmpty) {
val flushNow = pendingSends.isEmpty
pendingSends.append(frames)
if (flushNow) flush()
}
case opt: SocketOption handleSocketOption(opt)
case q: SocketOptionQuery handleSocketOptionQuery(q)
}
@ -117,48 +129,46 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
}
}
private def setupConnection() {
private def setupConnection(): Unit = {
params filter (_.isInstanceOf[SocketConnectOption]) foreach { self ! _ }
params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ }
}
private def deserializerFromParams = {
private def deserializerFromParams: Deserializer =
params collectFirst { case d: Deserializer d } getOrElse new ZMQMessageDeserializer
}
private def setupSocket() = {
params foreach {
private def setupSocket() = params foreach {
case _: SocketConnectOption | _: PubSubOption | _: SocketMeta // ignore, handled differently
case m self ! m
}
}
override def preRestart(reason: Throwable, message: Option[Any]) {
context.children foreach context.stop //Do not call postStop
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = context.children foreach context.stop //Do not call postStop
override def postRestart(reason: Throwable) {} //Do nothing
override def postRestart(reason: Throwable): Unit = () // Do nothing
override def postStop {
try {
override def postStop: Unit = try {
if (socket != null) {
poller.unregister(socket)
socket.close
}
} finally {
notifyListener(Closed)
} finally notifyListener(Closed)
@tailrec private def flushMessage(i: Seq[Frame]): Boolean =
if (i.isEmpty)
true
else {
val head = i.head
val tail = i.tail
if (socket.send(head.payload.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail)
else {
pendingSends.prepend(i) // Reenqueue the rest of the message so the next flush takes care of it
self ! Flush
false
}
}
private def sendMessage(frames: Seq[Frame]) {
def sendBytes(bytes: Seq[Byte], flags: Int) = socket.send(bytes.toArray, flags)
val iter = frames.iterator
while (iter.hasNext) {
val payload = iter.next.payload
val flags = if (iter.hasNext) JZMQ.SNDMORE else 0
sendBytes(payload, flags)
}
}
@tailrec private def flush(): Unit =
if (pendingSends.nonEmpty && flushMessage(pendingSends.remove(0))) flush() // Flush while things are going well
// this is a PollMsg=>Unit which either polls or schedules Poll, depending on the sign of the timeout
private val doPollTimeout = {

View file

@ -139,8 +139,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
*/
def newSocket(socketParameters: SocketOption*): ActorRef = {
implicit val timeout = NewSocketTimeout
val req = (zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef]
Await.result(req, timeout.duration)
Await.result((zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef], timeout.duration)
}
/**
@ -248,9 +247,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
case _ false
}
def receive = {
case p: Props sender ! context.actorOf(p)
}
def receive = { case p: Props sender ! context.actorOf(p) }
}), "zeromq")
}