From f1375b6bfbd89d02d81c78dcb75db448e5abfbd2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 7 Jul 2012 20:55:02 +0200 Subject: [PATCH 1/3] Making some changes in the cluster code to avoid doing work that isn't needed --- .../src/main/scala/akka/actor/ActorCell.scala | 6 +- .../scala/akka/cluster/ClusterDaemon.scala | 165 +++++++----------- .../src/main/scala/akka/cluster/Member.scala | 4 +- 3 files changed, 65 insertions(+), 110 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f922daea70..8bbb149858 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -508,8 +508,7 @@ private[akka] class ActorCell( } private def isTerminating = childrenRefs match { - case TerminatingChildrenContainer(_, _, Termination) ⇒ true - case TerminatedChildrenContainer ⇒ true + case TerminatedChildrenContainer | TerminatingChildrenContainer(_, _, Termination) ⇒ true case _ ⇒ false } private def isNormal = childrenRefs match { @@ -981,11 +980,10 @@ private[akka] class ActorCell( childrenRefs match { case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ val n = removeChild(child) - actor.supervisorStrategy.handleChildTerminated(this, child, children) if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" case Termination ⇒ doTerminate() - case _ ⇒ + case _ ⇒ actor.supervisorStrategy.handleChildTerminated(this, child, children) } case _ ⇒ removeChild(child) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index bf3e5a6b60..c23d345401 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -258,14 +258,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def joinSeedNode(): Unit = { val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress ⇒ self.path.toStringWithAddress(a) } - if (seedRoutees.isEmpty) { - join(selfAddress) - } else { + if (seedRoutees.isEmpty) join(selfAddress) + else { implicit val within = Timeout(SeedNodeTimeout) - val seedRouter = context.actorOf( - Props.empty.withRouter(ScatterGatherFirstCompletedRouter( - routees = seedRoutees, within = within.duration))) - seedRouter ? InitJoin pipeTo self + val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration))) + seedRouter ! InitJoin seedRouter ! PoisonPill } } @@ -289,8 +286,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) notifyListeners(localGossip) - val command = ClusterUserAction.Join(selfAddress) - coreSender ! SendClusterMessage(address, command) + coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) } /** @@ -395,9 +391,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val localUnreachableMembers = localOverview.unreachable // 1. check if the node to DOWN is in the 'members' set - val downedMember: Option[Member] = localMembers.collectFirst { - case m if m.address == address ⇒ m.copy(status = Down) - } + val downedMember: Option[Member] = + localMembers.collectFirst { case m if m.address == address ⇒ m.copy(status = Down) } + val newMembers = downedMember match { case Some(m) ⇒ log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address) @@ -419,9 +415,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember // 4. remove nodes marked as DOWN from the 'seen' table - val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { - case m if m.status == Down ⇒ m.address - } + val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down ⇒ m.address } // update gossip overview val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) @@ -488,45 +482,29 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) stats = stats.incrementMergeConflictCount val rate = mergeRate(stats.mergeConflictCount) - if (rate <= MaxGossipMergeRate) { - coreSender ! SendClusterMessage( - to = localGossip.leader.get, - msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope)) - } else { + + if (rate <= MaxGossipMergeRate) + coreSender ! SendClusterMessage(to = localGossip.leader.get, msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope)) + else log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate) - } } else { val winningGossip = - - if (conflict) { - // conflicting versions, merge, and new version - val mergedGossip = remoteGossip merge localGossip - mergedGossip :+ vclockNode - - } else if (remoteGossip.version < localGossip.version) { - // local gossip is newer - localGossip - - } else { - // remote gossip is newer - remoteGossip - - } + if (conflict) (remoteGossip merge localGossip) :+ vclockNode // conflicting versions, merge, and new version + else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer + else remoteGossip // remote gossip is newer val newJoinInProgress = if (joinInProgress.isEmpty) joinInProgress - else joinInProgress -- - winningGossip.members.map(_.address) -- - winningGossip.overview.unreachable.map(_.address) + else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address) latestGossip = winningGossip seen selfAddress joinInProgress = newJoinInProgress // for all new joining nodes we remove them from the failure detector - (latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach { node ⇒ - failureDetector.remove(node.address) + (latestGossip.members -- localGossip.members).foreach { + node ⇒ if (node.status == Joining) failureDetector.remove(node.address) } log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) @@ -563,30 +541,24 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) if (!isSingletonCluster && isAvailable) { val localGossip = latestGossip - // important to not accidentally use `map` of the SortedSet, since the original order is not preserved - val localMembers = localGossip.members.toIndexedSeq - val localMembersSize = localMembers.size - val localMemberAddresses = localMembers map { _.address } - val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq - val localUnreachableSize = localUnreachableMembers.size + val preferredGossipTargets = + if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view + // gossip to a random alive member with preference to a member with older or newer gossip version + val localMemberAddressesSet = localGossip.members map { _.address } + val nodesWithDifferentView = for { + (address, version) ← localGossip.overview.seen + if localMemberAddressesSet contains address + if version != localGossip.version + } yield address - // gossip to a random alive member with preference to a member - // with older or newer gossip version - val nodesWithdifferentView = { - val localMemberAddressesSet = localGossip.members map { _.address } - for { - (address, version) ← localGossip.overview.seen - if localMemberAddressesSet contains address - if version != localGossip.version - } yield address - } - val gossipedToAlive = - if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) - gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq) - else - gossipToRandomNodeOf(localMemberAddresses) + nodesWithDifferentView.toIndexedSeq + } else Vector.empty[Address] + gossipToRandomNodeOf( + if (preferredGossipTargets.nonEmpty) preferredGossipTargets + else localGossip.members.toIndexedSeq[Member].map(_.address) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved) + ) } } @@ -611,10 +583,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } // Leader actions are as follows: - // 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table - // 2. Move JOINING => UP -- When a node joins the cluster - // 3. Move LEAVING => EXITING -- When all partition handoff has completed - // 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader + // 1. Move JOINING => UP -- When a node joins the cluster + // 2. Move LEAVING => EXITING -- When all partition handoff has completed + // 3. Non-exiting remain -- When all partition handoff has completed + // 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table + // 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader // 5. Store away all stuff needed for the side-effecting processing in 10. // 6. Updating the vclock version for the changes // 7. Updating the 'seen' table @@ -633,28 +606,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) if (localGossip.convergence) { // we have convergence - so we can't have unreachable nodes - // transform the node member ring - filterNot/map/map - val newMembers = - localMembers filterNot { member ⇒ - // ---------------------- - // 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table - // ---------------------- - member.status == MemberStatus.Exiting - - } map { member ⇒ - // ---------------------- - // 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) - // ---------------------- - if (member.status == Joining) member copy (status = Up) - else member - - } map { member ⇒ - // ---------------------- - // 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) - // ---------------------- - if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting) - else member - } + // transform the node member ring + val newMembers = localMembers collect { + // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) + case member if member.status == Joining ⇒ member copy (status = Up) + // 2. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) + case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ member copy (status = Exiting) + // 3. Everyone else that is not Exiting stays as they are + case member if member.status != Exiting ⇒ member + // 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table + } // ---------------------- // 5. Store away all stuff needed for the side-effecting processing in 10. @@ -669,7 +630,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining) - val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) + val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty @@ -682,22 +643,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip - (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member]) + (newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Member.none) } else if (AutoDown) { // we don't have convergence - so we might have unreachable nodes // if 'auto-down' is turned on, then try to auto-down any unreachable nodes - val newUnreachableMembers = localUnreachableMembers.map { member ⇒ + val newUnreachableMembers = localUnreachableMembers collect { // ---------------------- - // 5. Move UNREACHABLE => DOWN (auto-downing by leader) + // 6. Move UNREACHABLE => DOWN (auto-downing by leader) // ---------------------- - if (member.status == Down) member // no need to DOWN members already DOWN - else member copy (status = Down) + case member if member.status != Down ⇒ member copy (status = Down) + case downMember ⇒ downMember // no need to DOWN members already DOWN } // Check for the need to do side-effecting on successful state change - val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down) + val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down) // removing nodes marked as DOWN from the 'seen' table val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.address } @@ -705,9 +666,9 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newGossip = localGossip copy (overview = newOverview) // update gossip - (newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers) + (newGossip, unreachableButNotDownedMembers.nonEmpty, Member.none, Member.none, Member.none, unreachableButNotDownedMembers) - } else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member]) + } else (localGossip, false, Member.none, Member.none, Member.none, Member.none) if (hasChangedState) { // we have a change of state - version it and try to update // ---------------------- @@ -769,20 +730,14 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys val deadline = Deadline.now + HeartbeatInterval - for (address ← beatTo; if address != selfAddress) - heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) + beatTo.foreach { address ⇒ if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) } } /** * Removes overdue joinInProgress from State. */ def removeOverdueJoinInProgress(): Unit = { - val overdueJoins = joinInProgress collect { - case (address, deadline) if deadline.isOverdue ⇒ address - } - if (overdueJoins.nonEmpty) { - joinInProgress = joinInProgress -- overdueJoins - } + joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index e4fa9f379e..a10899cf37 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -26,6 +26,8 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess */ object Member { + val none = Set.empty[Member] + /** * `Address` ordering type class, sorts addresses by host and port. */ @@ -56,7 +58,7 @@ object Member { // group all members by Address => Seq[Member] val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) // pick highest MemberStatus - (Set.empty[Member] /: groupedByAddress) { + (Member.none /: groupedByAddress) { case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) } } From 03ea219a7f5e205f27d8345f4096640156e665e1 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 7 Jul 2012 20:58:46 +0200 Subject: [PATCH 2/3] Undoing ActorCell enhancements --- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 8bbb149858..f922daea70 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -508,7 +508,8 @@ private[akka] class ActorCell( } private def isTerminating = childrenRefs match { - case TerminatedChildrenContainer | TerminatingChildrenContainer(_, _, Termination) ⇒ true + case TerminatingChildrenContainer(_, _, Termination) ⇒ true + case TerminatedChildrenContainer ⇒ true case _ ⇒ false } private def isNormal = childrenRefs match { @@ -980,10 +981,11 @@ private[akka] class ActorCell( childrenRefs match { case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ val n = removeChild(child) + actor.supervisorStrategy.handleChildTerminated(this, child, children) if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" case Termination ⇒ doTerminate() - case _ ⇒ actor.supervisorStrategy.handleChildTerminated(this, child, children) + case _ ⇒ } case _ ⇒ removeChild(child) From 37d5a1e1353cf462ad16bab591a375e1a8e07968 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 11 Jul 2012 22:32:53 +0200 Subject: [PATCH 3/3] #2325 - Adding AkkaProtocolMessageSizeEstimator to NettyRemoteTransport --- .../remote/netty/NettyRemoteSupport.scala | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 5c506abfc8..b87f873cde 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -5,8 +5,7 @@ package akka.remote.netty import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.Executors import scala.collection.mutable.HashMap @@ -17,12 +16,13 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBa import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } import org.jboss.netty.handler.timeout.IdleStateHandler -import org.jboss.netty.util.HashedWheelTimer +import org.jboss.netty.util.{ DefaultObjectSizeEstimator, HashedWheelTimer } import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } import akka.util.NonFatal import akka.actor.{ ExtendedActorSystem, Address, ActorRef } +import com.google.protobuf.MessageLite private[akka] object ChannelAddress extends ChannelLocal[Option[Address]] { override def initialValue(ch: Channel): Option[Address] = None @@ -108,9 +108,24 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider settings.MaxTotalMemorySize, settings.ExecutionPoolKeepalive.length, settings.ExecutionPoolKeepalive.unit, + AkkaProtocolMessageSizeEstimator, system.threadFactory))) else Nil + /** + * Helps keep track of how many bytes are in flight + */ + object AkkaProtocolMessageSizeEstimator extends DefaultObjectSizeEstimator { + override final def estimateSize(o: AnyRef): Int = + o match { + case proto: MessageLite ⇒ + val msgSize = proto.getSerializedSize + val misalignment = msgSize % 8 + if (misalignment != 0) msgSize + 8 - misalignment else msgSize + case msg ⇒ super.estimateSize(msg) + } + } + /** * Construct and authentication handler which uses the SecureCookie to somewhat * protect the TCP port from unauthorized use (don’t rely on it too much, though,