Merge branch 'master' into wip-2202-cluster-domain-events-patriknw

Conflicts:
	akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
This commit is contained in:
Patrik Nordwall 2012-08-16 18:54:10 +02:00
commit 1700edb863
27 changed files with 336 additions and 378 deletions

View file

@ -6,12 +6,10 @@ package akka.cluster
import scala.collection.immutable.SortedSet
import scala.concurrent.util.{ Deadline, Duration }
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, RootActorPath, PoisonPill, Scheduler }
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler }
import akka.actor.Status.Failure
import akka.event.EventStream
import akka.routing.ScatterGatherFirstCompletedRouter
import akka.util.Timeout
import akka.pattern.{ AskTimeoutException, ask, pipe }
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import language.existentials
@ -75,15 +73,20 @@ private[cluster] object InternalClusterAction {
*/
case class InitJoinAck(address: Address) extends ClusterMessage
case object GossipTick
/**
* Marker interface for periodic tick messages
*/
sealed trait Tick
case object HeartbeatTick
case object GossipTick extends Tick
case object ReapUnreachableTick
case object HeartbeatTick extends Tick
case object LeaderActionsTick
case object ReapUnreachableTick extends Tick
case object PublishStatsTick
case object LeaderActionsTick extends Tick
case object PublishStatsTick extends Tick
case class SendClusterMessage(to: Address, msg: ClusterMessage)
@ -215,7 +218,14 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
})
override def preStart(): Unit = {
if (AutoJoin) self ! InternalClusterAction.JoinSeedNode
if (AutoJoin) {
// only the node which is named first in the list of seed nodes will join itself
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
self ! JoinTo(selfAddress)
else
context.actorOf(Props(new JoinSeedNodeProcess(environment)).
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
}
}
override def postStop(): Unit = {
@ -226,7 +236,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
publishStateTask foreach { _.cancel() }
}
def receive = {
def uninitialized: Actor.Receive = {
case InitJoin // skip, not ready yet
case JoinTo(address) join(address)
case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber) unsubscribe(subscriber)
case _: Tick // ignore periodic tasks until initialized
}
def initialized: Actor.Receive = {
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
@ -234,10 +252,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
case JoinSeedNode joinSeedNode()
case InitJoin initJoin()
case InitJoinAck(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case JoinTo(address) join(address)
case ClusterUserAction.Join(address) joining(address)
case ClusterUserAction.Down(address) downing(address)
@ -251,21 +266,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
}
def joinSeedNode(): Unit = {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
if (seedRoutees.isEmpty) join(selfAddress)
else {
implicit val within = Timeout(SeedNodeTimeout)
val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration)))
seedRouter ! InitJoin
seedRouter ! PoisonPill
}
}
def receive = uninitialized
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodeTimeout(): Unit = join(selfAddress)
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
@ -281,7 +285,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
publish(localGossip)
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
context.become(initialized)
if (address == selfAddress)
joining(address)
else
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
}
/**
@ -870,6 +878,53 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def ping(p: Ping): Unit = sender ! Pong(p)
}
/**
* INTERNAL API.
*
* Sends InitJoinAck to all seed nodes (except itself) and expect
* InitJoinAck reply back. The seed node that replied first
* will be used, joined to. InitJoinAck replies received after the
* first one are ignored.
*
* Retries if no InitJoinAck replies are received within the
* SeedNodeTimeout.
* When at least one reply has been received it stops itself after
* an idle SeedNodeTimeout.
*
*/
private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging {
import InternalClusterAction._
def selfAddress = environment.selfAddress
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
throw new IllegalArgumentException("Join seed node should not be done")
context.setReceiveTimeout(environment.settings.SeedNodeTimeout)
override def preStart(): Unit = self ! JoinSeedNode
def receive = {
case JoinSeedNode
// send InitJoin to all seed nodes (except myself)
environment.seedNodes.collect {
case a if a != selfAddress context.system.actorFor(context.parent.path.toStringWithAddress(a))
} foreach { _ ! InitJoin }
case InitJoinAck(address)
// first InitJoinAck reply
context.parent ! JoinTo(address)
context.become(done)
case ReceiveTimeout
// no InitJoinAck received, try again
self ! JoinSeedNode
}
def done: Actor.Receive = {
case InitJoinAck(_) // already received one, skip rest
case ReceiveTimeout context.stop(self)
}
}
/**
* INTERNAL API.
*/