Merge branch 'master' into wip-scala210M4-√

This commit is contained in:
Viktor Klang 2012-07-11 23:31:07 +02:00
commit df2389f3bf
3 changed files with 81 additions and 109 deletions

View file

@ -246,14 +246,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def joinSeedNode(): Unit = {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
if (seedRoutees.isEmpty) {
join(selfAddress)
} else {
if (seedRoutees.isEmpty) join(selfAddress)
else {
implicit val within = Timeout(SeedNodeTimeout)
val seedRouter = context.actorOf(
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
routees = seedRoutees, within = within.duration)))
seedRouter ? InitJoin pipeTo self
val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration)))
seedRouter ! InitJoin
seedRouter ! PoisonPill
}
}
@ -277,8 +274,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
notifyListeners(localGossip)
val command = ClusterUserAction.Join(selfAddress)
coreSender ! SendClusterMessage(address, command)
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
}
/**
@ -383,9 +379,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val localUnreachableMembers = localOverview.unreachable
// 1. check if the node to DOWN is in the 'members' set
val downedMember: Option[Member] = localMembers.collectFirst {
case m if m.address == address m.copy(status = Down)
}
val downedMember: Option[Member] =
localMembers.collectFirst { case m if m.address == address m.copy(status = Down) }
val newMembers = downedMember match {
case Some(m)
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address)
@ -407,9 +403,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect {
case m if m.status == Down m.address
}
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down m.address }
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
@ -476,45 +470,29 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) {
coreSender ! SendClusterMessage(
to = localGossip.leader.get,
msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope))
} else {
if (rate <= MaxGossipMergeRate)
coreSender ! SendClusterMessage(to = localGossip.leader.get, msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope))
else
log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate)
}
} else {
val winningGossip =
if (conflict) {
// conflicting versions, merge, and new version
val mergedGossip = remoteGossip merge localGossip
mergedGossip :+ vclockNode
} else if (remoteGossip.version < localGossip.version) {
// local gossip is newer
localGossip
} else {
// remote gossip is newer
remoteGossip
}
if (conflict) (remoteGossip merge localGossip) :+ vclockNode // conflicting versions, merge, and new version
else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer
else remoteGossip // remote gossip is newer
val newJoinInProgress =
if (joinInProgress.isEmpty) joinInProgress
else joinInProgress --
winningGossip.members.map(_.address) --
winningGossip.overview.unreachable.map(_.address)
else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address)
latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { node
failureDetector.remove(node.address)
(latestGossip.members -- localGossip.members).foreach {
node if (node.status == Joining) failureDetector.remove(node.address)
}
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
@ -551,30 +529,24 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
if (!isSingletonCluster && isAvailable) {
val localGossip = latestGossip
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
val localMembersSize = localMembers.size
val localMemberAddresses = localMembers map { _.address }
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq
val localUnreachableSize = localUnreachableMembers.size
// gossip to a random alive member with preference to a member
// with older or newer gossip version
val nodesWithdifferentView = {
val preferredGossipTargets =
if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older or newer gossip version
val localMemberAddressesSet = localGossip.members map { _.address }
for {
val nodesWithDifferentView = for {
(address, version) localGossip.overview.seen
if localMemberAddressesSet contains address
if version != localGossip.version
} yield address
}
val gossipedToAlive =
if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability)
gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq)
else
gossipToRandomNodeOf(localMemberAddresses)
nodesWithDifferentView.toIndexedSeq
} else Vector.empty[Address]
gossipToRandomNodeOf(
if (preferredGossipTargets.nonEmpty) preferredGossipTargets
else localGossip.members.toIndexedSeq[Member].map(_.address) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
)
}
}
@ -599,10 +571,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
}
// Leader actions are as follows:
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 2. Move JOINING => UP -- When a node joins the cluster
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 1. Move JOINING => UP -- When a node joins the cluster
// 2. Move LEAVING => EXITING -- When all partition handoff has completed
// 3. Non-exiting remain -- When all partition handoff has completed
// 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
@ -621,27 +594,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes
// transform the node member ring - filterNot/map/map
val newMembers =
localMembers filterNot { member
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
member.status == MemberStatus.Exiting
} map { member
// ----------------------
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == Joining) member copy (status = Up)
else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
else member
// transform the node member ring
val newMembers = localMembers collect {
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
case member if member.status == Joining member copy (status = Up)
// 2. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully member copy (status = Exiting)
// 3. Everyone else that is not Exiting stays as they are
case member if member.status != Exiting member
// 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
}
// ----------------------
@ -657,7 +618,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
@ -670,22 +631,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member])
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Member.none)
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers.map { member
val newUnreachableMembers = localUnreachableMembers collect {
// ----------------------
// 5. Move UNREACHABLE => DOWN (auto-downing by leader)
// 6. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
if (member.status == Down) member // no need to DOWN members already DOWN
else member copy (status = Down)
case member if member.status != Down member copy (status = Down)
case downMember downMember // no need to DOWN members already DOWN
}
// Check for the need to do side-effecting on successful state change
val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down)
val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down)
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address }
@ -693,9 +654,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers)
(newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, unreachableButNotDownedMembers)
} else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member])
} else (localGossip, false, Member.none, Member.none, Member.none, Member.none)
if (hasChangedState) { // we have a change of state - version it and try to update
// ----------------------
@ -757,20 +718,14 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys
val deadline = Deadline.now + HeartbeatInterval
for (address beatTo; if address != selfAddress)
heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline)
beatTo.foreach { address if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) }
}
/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
val overdueJoins = joinInProgress collect {
case (address, deadline) if deadline.isOverdue address
}
if (overdueJoins.nonEmpty) {
joinInProgress = joinInProgress -- overdueJoins
}
joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
}
/**

View file

@ -26,6 +26,8 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess
*/
object Member {
val none = Set.empty[Member]
/**
* `Address` ordering type class, sorts addresses by host and port.
*/
@ -56,7 +58,7 @@ object Member {
// group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
(Member.none /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(highestPriorityOf)
}
}

View file

@ -5,8 +5,7 @@
package akka.remote.netty
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.concurrent.Executors
import scala.collection.mutable.HashMap
@ -17,12 +16,13 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBa
import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder }
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
import org.jboss.netty.handler.timeout.IdleStateHandler
import org.jboss.netty.util.HashedWheelTimer
import org.jboss.netty.util.{ DefaultObjectSizeEstimator, HashedWheelTimer }
import akka.event.Logging
import akka.remote.RemoteProtocol.AkkaRemoteProtocol
import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted }
import akka.util.NonFatal
import akka.actor.{ ExtendedActorSystem, Address, ActorRef }
import com.google.protobuf.MessageLite
private[akka] object ChannelAddress extends ChannelLocal[Option[Address]] {
override def initialValue(ch: Channel): Option[Address] = None
@ -108,9 +108,24 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
settings.MaxTotalMemorySize,
settings.ExecutionPoolKeepalive.length,
settings.ExecutionPoolKeepalive.unit,
AkkaProtocolMessageSizeEstimator,
system.threadFactory)))
else Nil
/**
* Helps keep track of how many bytes are in flight
*/
object AkkaProtocolMessageSizeEstimator extends DefaultObjectSizeEstimator {
override final def estimateSize(o: AnyRef): Int =
o match {
case proto: MessageLite
val msgSize = proto.getSerializedSize
val misalignment = msgSize % 8
if (misalignment != 0) msgSize + 8 - misalignment else msgSize
case msg super.estimateSize(msg)
}
}
/**
* Construct and authentication handler which uses the SecureCookie to somewhat
* protect the TCP port from unauthorized use (dont rely on it too much, though,