From 99ad1e0eeb545ab87f3e2bc876b169bd5e4da9f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Fri, 5 Oct 2012 15:15:17 +0200 Subject: [PATCH 01/23] Document how to schedule periodic messages from an actor to itself. #2513 --- .../docs/pattern/SchedulerPatternTest.scala | 9 + .../pattern/SchedulerPatternTestBase.java | 189 ++++++++++++++++++ akka-docs/rst/java/howto.rst | 29 ++- .../docs/pattern/SchedulerPatternSpec.scala | 98 +++++++++ akka-docs/rst/scala/howto.rst | 29 ++- 5 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala create mode 100644 akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java create mode 100644 akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala new file mode 100644 index 0000000000..d450bbc090 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +import org.scalatest.junit.JUnitSuite + +class SchedulerPatternTest extends SchedulerPatternTestBase with JUnitSuite diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java new file mode 100644 index 0000000000..b2543bfb19 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java @@ -0,0 +1,189 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern; + +import akka.actor.*; +import akka.testkit.*; +import akka.testkit.TestEvent.Mute; +import akka.testkit.TestEvent.UnMute; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import scala.concurrent.util.Duration; +import scala.concurrent.util.FiniteDuration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class SchedulerPatternTestBase { + + ActorSystem system; + + @Before + public void setUp() { + system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf()); + } + + @After + public void tearDown() { + system.shutdown(); + } + + static + //#schedule-constructor + public class ScheduleInConstructor extends UntypedActor { + + private final Cancellable tick = getContext().system().scheduler().schedule( + Duration.create(500, TimeUnit.MILLISECONDS), + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + //#schedule-constructor + ActorRef target; + public ScheduleInConstructor(ActorRef target) { + this.target = target; + } + //#schedule-constructor + + @Override + public void postStop() { + tick.cancel(); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // do something useful here + //#schedule-constructor + target.tell(message, getSelf()); + //#schedule-constructor + } + //#schedule-constructor + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-constructor + else { + unhandled(message); + } + } + } + //#schedule-constructor + + static + //#schedule-receive + public class ScheduleInReceive extends UntypedActor { + //#schedule-receive + ActorRef target; + public ScheduleInReceive(ActorRef target) { + this.target = target; + } + //#schedule-receive + + @Override + public void preStart() { + getContext().system().scheduler().scheduleOnce( + Duration.create(500, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + } + + // override postRestart so we don't call preStart and schedule a new message + @Override + public void postRestart(Throwable reason) { + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // send another periodic tick after the specified delay + getContext().system().scheduler().scheduleOnce( + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + // do something useful here + //#schedule-receive + target.tell(message, getSelf()); + //#schedule-receive + } + //#schedule-receive + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-receive + else { + unhandled(message); + } + } + } + //#schedule-receive + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInConstructor() { + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInConstructor(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2000 millis")); + }}; + } + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInReceive() { + + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInReceive(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2500 millis")); + }}; + } + + public static class TestSchedule extends JavaTestKit { + private ActorSystem system; + + public TestSchedule(ActorSystem system) { + super(system); + this.system = system; + } + + public void testSchedule(final JavaTestKit probe, Props props, + FiniteDuration startDuration, + FiniteDuration afterRestartDuration) { + Iterable filter = + Arrays.asList(new akka.testkit.EventFilter[]{ + (akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)}); + system.eventStream().publish(new Mute(filter)); + + final ActorRef actor = system.actorOf(props); + new Within(startDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + actor.tell("restart", getRef()); + new Within(afterRestartDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + system.stop(actor); + + system.eventStream().publish(new UnMute(filter)); + } + } +} diff --git a/akka-docs/rst/java/howto.rst b/akka-docs/rst/java/howto.rst index 204d50e7dc..e1f9f1610b 100644 --- a/akka-docs/rst/java/howto.rst +++ b/akka-docs/rst/java/howto.rst @@ -17,6 +17,34 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library You might find some of the patterns described in the Scala chapter of :ref:`howto-scala` useful even though the example code is written in Scala. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduler will be restarted with the actor on restarts. + +.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-receive + Template Pattern ================ @@ -33,4 +61,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. - diff --git a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala new file mode 100644 index 0000000000..e79d94b2c5 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +import language.postfixOps + +import akka.actor.{ Props, ActorRef, Actor } +import scala.concurrent.util.duration._ +import scala.concurrent.util.{ FiniteDuration, Duration } +import akka.testkit.{ TimingTest, AkkaSpec, filterException } +import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor + +object SchedulerPatternSpec { + //#schedule-constructor + class ScheduleInConstructor extends Actor { + import context._ + + val tick = system.scheduler.schedule(500 millis, 1000 millis, self, "tick") + //#schedule-constructor + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-constructor + + override def postStop() = tick.cancel() + + def receive = { + case "tick" ⇒ + // do something useful here + //#schedule-constructor + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-constructor + } + } + //#schedule-constructor + + //#schedule-receive + class ScheduleInReceive extends Actor { + import context._ + //#schedule-receive + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-receive + + override def preStart() = + system.scheduler.scheduleOnce(500 millis, self, "tick") + + // override postRestart so we don't call preStart and schedule a new message + override def postRestart(reason: Throwable) = {} + + def receive = { + case "tick" ⇒ + // send another periodic tick after the specified delay + system.scheduler.scheduleOnce(1000 millis, self, "tick") + // do something useful here + //#schedule-receive + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-receive + } + } + //#schedule-receive +} + +class SchedulerPatternSpec extends AkkaSpec { + + def testSchedule(actor: ActorRef, startDuration: FiniteDuration, + afterRestartDuration: FiniteDuration) = { + + filterException[ArithmeticException] { + within(startDuration) { + expectMsg("tick") + expectMsg("tick") + expectMsg("tick") + } + actor ! "restart" + within(afterRestartDuration) { + expectMsg("tick") + expectMsg("tick") + } + system.stop(actor) + } + } + + "send periodic ticks from the constructor" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2000 millis) + } + + "send ticks from the preStart and receive" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2500 millis) + } +} diff --git a/akka-docs/rst/scala/howto.rst b/akka-docs/rst/scala/howto.rst index 7d064e2491..c5203adb1c 100644 --- a/akka-docs/rst/scala/howto.rst +++ b/akka-docs/rst/scala/howto.rst @@ -111,6 +111,34 @@ This is where the Spider pattern comes in." The pattern is described `Discovering Message Flows in Actor System with the Spider Pattern `_. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduler will be restarted with the actor on restarts. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-receive + Template Pattern ================ @@ -127,4 +155,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. - From cecde67226ec13e79d503429680cd6a62c34b651 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 1 Oct 2012 10:02:48 +0200 Subject: [PATCH 02/23] Move heartbeat sending out from ClusterCoreDaemon, see #2284 --- .../scala/akka/cluster/ClusterDaemon.scala | 44 ++-------- .../scala/akka/cluster/ClusterHeartbeat.scala | 88 +++++++++++++++++-- 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 6012c48f45..10bcd9ee6a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -155,8 +155,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac withDispatcher(context.props.dispatcher), name = "publisher") val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)). withDispatcher(context.props.dispatcher), name = "core") - context.actorOf(Props[ClusterHeartbeatDaemon]. - withDispatcher(context.props.dispatcher), name = "heartbeat") + context.actorOf(Props[ClusterHeartbeatReceiver]. + withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)). withDispatcher(context.props.dispatcher), name = "metrics") @@ -172,26 +172,24 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ - import ClusterHeartbeatSender._ + import ClusterHeartbeatSender.JoinInProgress val cluster = Cluster(context.system) import cluster.{ selfAddress, scheduler, failureDetector } import cluster.settings._ val vclockNode = VectorClock.Node(selfAddress.toString) - val selfHeartbeat = Heartbeat(selfAddress) // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet var latestGossip: Gossip = Gossip() - var joinInProgress: Map[Address, Deadline] = Map.empty var stats = ClusterStats() - val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. - withDispatcher(UseDispatcher), name = "heartbeatSender") val coreSender = context.actorOf(Props[ClusterCoreSender]. withDispatcher(UseDispatcher), name = "coreSender") + val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. + withDispatcher(UseDispatcher), name = "heartbeatSender") import context.dispatcher @@ -201,12 +199,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto self ! GossipTick } - // start periodic heartbeat to all nodes in cluster - val heartbeatTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { - self ! HeartbeatTick - } - // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) val failureDetectorReaperTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) { @@ -232,7 +224,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto override def postStop(): Unit = { gossipTask.cancel() - heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() publishStatsTask foreach { _.cancel() } @@ -250,7 +241,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() - case HeartbeatTick ⇒ heartbeat() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() case PublishStatsTick ⇒ publishInternalStats() @@ -293,11 +283,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localGossip = latestGossip // wipe our state since a node that joins a cluster must be empty latestGossip = Gossip() - joinInProgress = Map(address -> (Deadline.now + JoinTimeout)) // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() + heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) publish(localGossip) context.become(initialized) @@ -517,12 +507,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer else remoteGossip // remote gossip is newer - val newJoinInProgress = - if (joinInProgress.isEmpty) joinInProgress - else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address) - latestGossip = winningGossip seen selfAddress - joinInProgress = newJoinInProgress // for all new joining nodes we remove them from the failure detector (latestGossip.members -- localGossip.members).foreach { @@ -744,27 +729,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } } - def heartbeat(): Unit = { - removeOverdueJoinInProgress() - - val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys - - val deadline = Deadline.now + HeartbeatInterval - beatTo.foreach { address ⇒ if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) } - } - - /** - * Removes overdue joinInProgress from State. - */ - def removeOverdueJoinInProgress(): Unit = { - joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } - } - /** * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ def reapUnreachableMembers(): Unit = { - if (!isSingletonCluster && isAvailable) { // only scrutinize if we are a non-singleton cluster and available diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index b48c9f066b..b28542bf24 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -4,12 +4,14 @@ package akka.cluster import language.postfixOps - +import scala.collection.immutable.SortedSet import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } import java.security.MessageDigest import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } import scala.concurrent.util.duration._ import scala.concurrent.util.Deadline +import scala.concurrent.util.FiniteDuration +import akka.cluster.ClusterEvent._ /** * Sent at regular intervals for failure detection. @@ -19,11 +21,11 @@ case class Heartbeat(from: Address) extends ClusterMessage /** * INTERNAL API. * - * Receives Heartbeat messages and delegates to Cluster. + * Receives Heartbeat messages and updates failure detector. * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized * to Cluster message after message, but concurrent with other types of messages. */ -private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { val failureDetector = Cluster(context.system).failureDetector @@ -38,12 +40,18 @@ private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogg */ private[cluster] object ClusterHeartbeatSender { /** - * - * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]] + * Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]] * to the other node. * Local only, no need to serialize. */ case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + + /** + * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of + * another node and heartbeats should be sent until it becomes member or deadline is overdue. + * Local only, no need to serialize. + */ + case class JoinInProgress(address: Address, deadline: Deadline) } /* @@ -57,12 +65,39 @@ private[cluster] object ClusterHeartbeatSender { */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ + import Member.addressOrdering + import InternalClusterAction.HeartbeatTick + + val cluster = Cluster(context.system) + import cluster.{ selfAddress, scheduler } + import cluster.settings._ + import context.dispatcher + + val selfHeartbeat = Heartbeat(selfAddress) + + var nodes: SortedSet[Address] = SortedSet.empty + var joinInProgress: Map[Address, Deadline] = Map.empty + + // start periodic heartbeat to other nodes in cluster + val heartbeatTask = + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { + self ! HeartbeatTick + } + + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + } + + override def postStop(): Unit = { + heartbeatTask.cancel() + cluster.unsubscribe(self) + } /** * Looks up and returns the remote cluster heartbeat connection for the specific address. */ def clusterHeartbeatConnectionFor(address: Address): ActorRef = - context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") val digester = MessageDigest.getInstance("MD5") @@ -76,14 +111,51 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg } def receive = { - case msg @ SendHeartbeat(from, to, deadline) ⇒ + case state: CurrentClusterState ⇒ init(state) + case MemberUnreachable(m) ⇒ removeMember(m) + case MemberRemoved(m) ⇒ removeMember(m) + case e: MemberEvent ⇒ addMember(e.member) + case JoinInProgress(a, d) ⇒ joinInProgress += (a -> d) + case HeartbeatTick ⇒ heartbeat() + } + + def init(state: CurrentClusterState): Unit = { + nodes = state.members.map(_.address) + joinInProgress --= nodes + } + + def addMember(m: Member): Unit = { + nodes += m.address + joinInProgress -= m.address + } + + def removeMember(m: Member): Unit = { + nodes -= m.address + joinInProgress -= m.address + } + + def heartbeat(): Unit = { + removeOverdueJoinInProgress() + + val beatTo = nodes ++ joinInProgress.keys + + val deadline = Deadline.now + HeartbeatInterval + for (to ← beatTo; if to != selfAddress) { val workerName = encodeChildName(to.toString) val worker = context.actorFor(workerName) match { case notFound if notFound.isTerminated ⇒ context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) case child ⇒ child } - worker ! msg + worker ! SendHeartbeat(selfHeartbeat, to, deadline) + } + } + + /** + * Removes overdue joinInProgress from State. + */ + def removeOverdueJoinInProgress(): Unit = { + joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue ⇒ address } } } From 7557433491f39f88797c3e869aafc493abba2869 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 1 Oct 2012 11:15:29 +0200 Subject: [PATCH 03/23] URLEncode heartbeat sender child names * Names can be url encoded now, instead of MD5 --- .../main/scala/akka/cluster/ClusterHeartbeat.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index b28542bf24..734c923df5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -6,12 +6,12 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } -import java.security.MessageDigest import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } import scala.concurrent.util.duration._ import scala.concurrent.util.Deadline import scala.concurrent.util.FiniteDuration import akka.cluster.ClusterEvent._ +import java.net.URLEncoder /** * Sent at regular intervals for failure detection. @@ -99,16 +99,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def clusterHeartbeatConnectionFor(address: Address): ActorRef = context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") - val digester = MessageDigest.getInstance("MD5") - /** - * Child name is MD5 hash of the address. - * FIXME Change to URLEncode when ticket #2123 has been fixed + * Child name URL encoded target address. */ - def encodeChildName(name: String): String = { - digester update name.getBytes("UTF-8") - digester.digest.map { h ⇒ "%02x".format(0xFF & h) }.mkString - } + def encodeChildName(name: String): String = URLEncoder.encode(name, "UTF-8") def receive = { case state: CurrentClusterState ⇒ init(state) From 3f73705abc7ca77e3f6166b588cb3ac060307c28 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 1 Oct 2012 14:12:20 +0200 Subject: [PATCH 04/23] Use consistent hash to heartbeat to a few nodes instead of all, see #2284 * Previously heartbeat messages was sent to all other members, i.e. each member was monitored by all other members in the cluster. * This was the number one know scalability bottleneck, due to the number of interconnections. * Limit sending of heartbeats to a few (5) members. Select and re-balance with consistent hashing algorithm when new members are added or removed. * Send a few EndHeartbeat when ending send of Heartbeat messages. --- .gitignore | 1 + .../src/main/resources/reference.conf | 4 + .../src/main/scala/akka/cluster/Cluster.scala | 4 +- .../scala/akka/cluster/ClusterDaemon.scala | 2 +- .../scala/akka/cluster/ClusterHeartbeat.scala | 210 +++++++++++++----- .../scala/akka/cluster/ClusterSettings.scala | 3 + .../scala/akka/cluster/LargeClusterSpec.scala | 6 +- .../akka/cluster/ClusterConfigSpec.scala | 2 + 8 files changed, 172 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index f646a4c173..18fb5d762a 100755 --- a/.gitignore +++ b/.gitignore @@ -67,3 +67,4 @@ redis/ beanstalk/ .scalastyle bin/ +.worksheet diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 4347f6c0b0..a1215f4563 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -78,6 +78,10 @@ akka { # how often should the node send out heartbeats? heartbeat-interval = 1s + # Number of member nodes that each member will send heartbeat messages to, + # i.e. each node will be monitored by this number of other nodes. + monitored-by-nr-of-members = 5 + # defines the failure detector threshold # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely, a high diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 25b1cd684b..6863b1224e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -62,7 +62,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - val selfAddress = system.provider match { + val selfAddress: Address = system.provider match { case c: ClusterActorRefProvider ⇒ c.transport.address case other ⇒ throw new ConfigurationException( "ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]". @@ -74,7 +74,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { log.info("Cluster Node [{}] - is starting up...", selfAddress) - val failureDetector = { + val failureDetector: FailureDetector = { import settings.{ FailureDetectorImplementationClass ⇒ fqcn } system.dynamicAccess.createInstanceFor[FailureDetector]( fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 10bcd9ee6a..1b4398feeb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -287,8 +287,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() - heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) publish(localGossip) + heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) context.become(initialized) if (address == selfAddress) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 734c923df5..4e80223dbf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -5,18 +5,32 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet -import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } -import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import scala.annotation.tailrec import scala.concurrent.util.duration._ import scala.concurrent.util.Deadline import scala.concurrent.util.FiniteDuration -import akka.cluster.ClusterEvent._ import java.net.URLEncoder +import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props } +import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import akka.cluster.ClusterEvent._ +import akka.routing.ConsistentHash /** - * Sent at regular intervals for failure detection. + * INTERNAL API */ -case class Heartbeat(from: Address) extends ClusterMessage +private[akka] object ClusterHeartbeatReceiver { + /** + * Sent at regular intervals for failure detection. + */ + case class Heartbeat(from: Address) extends ClusterMessage + + /** + * Tell failure detector at receiving side that it should + * remove the monitoring, because heartbeats will end from + * this node. + */ + case class EndHeartbeat(from: Address) extends ClusterMessage +} /** * INTERNAL API. @@ -26,11 +40,13 @@ case class Heartbeat(from: Address) extends ClusterMessage * to Cluster message after message, but concurrent with other types of messages. */ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { + import ClusterHeartbeatReceiver._ val failureDetector = Cluster(context.system).failureDetector def receive = { - case Heartbeat(from) ⇒ failureDetector heartbeat from + case Heartbeat(from) ⇒ failureDetector heartbeat from + case EndHeartbeat(from) ⇒ failureDetector remove from } } @@ -39,16 +55,11 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo * INTERNAL API */ private[cluster] object ClusterHeartbeatSender { - /** - * Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]] - * to the other node. - * Local only, no need to serialize. - */ - case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) - /** * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of - * another node and heartbeats should be sent until it becomes member or deadline is overdue. + * another node and heartbeats should be sent unconditionally until it becomes + * member or deadline is overdue. This is done to be able to detect immediate death + * of the joining node. * Local only, no need to serialize. */ case class JoinInProgress(address: Address, deadline: Deadline) @@ -58,14 +69,17 @@ private[cluster] object ClusterHeartbeatSender { * INTERNAL API * * This actor is responsible for sending the heartbeat messages to - * other nodes. Netty blocks when sending to broken connections. This actor - * isolates sending to different nodes by using child workers for each target + * a few other nodes that will monitor this node. + * + * Netty blocks when sending to broken connections. This actor + * isolates sending to different nodes by using child actors for each target * address and thereby reduce the risk of irregular heartbeats to healty * nodes due to broken connections to other nodes. */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ - import Member.addressOrdering + import ClusterHeartbeatSenderConnection._ + import ClusterHeartbeatReceiver._ import InternalClusterAction.HeartbeatTick val cluster = Cluster(context.system) @@ -74,9 +88,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg import context.dispatcher val selfHeartbeat = Heartbeat(selfAddress) + val selfEndHeartbeat = EndHeartbeat(selfAddress) + val selfAddressStr = selfAddress.toString - var nodes: SortedSet[Address] = SortedSet.empty - var joinInProgress: Map[Address, Deadline] = Map.empty + var all = Set.empty[Address] + var current = Set.empty[Address] + var ending = Map.empty[Address, Int] + var joinInProgress = Map.empty[Address, Deadline] + var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor) // start periodic heartbeat to other nodes in cluster val heartbeatTask = @@ -99,63 +118,146 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def clusterHeartbeatConnectionFor(address: Address): ActorRef = context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") - /** - * Child name URL encoded target address. - */ - def encodeChildName(name: String): String = URLEncoder.encode(name, "UTF-8") - def receive = { + case HeartbeatTick ⇒ heartbeat() case state: CurrentClusterState ⇒ init(state) case MemberUnreachable(m) ⇒ removeMember(m) case MemberRemoved(m) ⇒ removeMember(m) case e: MemberEvent ⇒ addMember(e.member) - case JoinInProgress(a, d) ⇒ joinInProgress += (a -> d) - case HeartbeatTick ⇒ heartbeat() + case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) } def init(state: CurrentClusterState): Unit = { - nodes = state.members.map(_.address) - joinInProgress --= nodes + all = state.members.collect { case m if m.address != selfAddress ⇒ m.address } + joinInProgress --= all + consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor) } - def addMember(m: Member): Unit = { - nodes += m.address - joinInProgress -= m.address + def addMember(m: Member): Unit = if (m.address != selfAddress) { + all += m.address + consistentHash = consistentHash :+ m.address + removeJoinInProgress(m.address) + update() } - def removeMember(m: Member): Unit = { - nodes -= m.address - joinInProgress -= m.address + def removeMember(m: Member): Unit = if (m.address != selfAddress) { + all -= m.address + consistentHash = consistentHash :- m.address + removeJoinInProgress(m.address) + update() + } + + def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) { + joinInProgress -= address + ending += (address -> 0) + } + + def addJoinInProgress(address: Address, deadline: Deadline): Unit = { + if (address != selfAddress && !all.contains(address)) + joinInProgress += (address -> deadline) } def heartbeat(): Unit = { removeOverdueJoinInProgress() - val beatTo = nodes ++ joinInProgress.keys - - val deadline = Deadline.now + HeartbeatInterval - for (to ← beatTo; if to != selfAddress) { - val workerName = encodeChildName(to.toString) - val worker = context.actorFor(workerName) match { + def connection(to: Address): ActorRef = { + // URL encoded target address as child actor name + val connectionName = URLEncoder.encode(to.toString, "UTF-8") + context.actorFor(connectionName) match { case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) + context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName) case child ⇒ child } - worker ! SendHeartbeat(selfHeartbeat, to, deadline) + } + + val deadline = Deadline.now + HeartbeatInterval + (current ++ joinInProgress.keys) foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } + + // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is + // sent to notify it that no more heartbeats will be sent. + for ((to, count) ← ending) { + val c = connection(to) + c ! SendEndHeartbeat(selfEndHeartbeat, to) + if (count == NumberOfEndHeartbeats) { + ending -= to + c ! PoisonPill + } else { + ending += (to -> (count + 1)) + } } } /** - * Removes overdue joinInProgress from State. + * Update current peers to send heartbeats to, and + * keep track of which nodes to stop sending heartbeats to. + */ + def update(): Unit = { + val previous = current + current = selectPeers + // start ending process for nodes not selected any more + ending ++= (previous -- current).map(_ -> 0) + // abort ending process for nodes that have been selected again + ending --= current + } + + /** + * Select a few peers that heartbeats will be sent to, i.e. that will + * monitor this node. Try to send heartbeats to same nodes as much + * as possible, but re-balance with consistent hashing algorithm when + * new members are added or removed. + */ + def selectPeers: Set[Address] = { + val allSize = all.size + val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers) + // try more if consistentHash results in same node as already selected + val attemptLimit = nrOfPeers * 2 + @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { + if (acc.size == nrOfPeers || n == attemptLimit) acc + else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1) + } + if (nrOfPeers >= allSize) all + else select(Set.empty[Address], 0) + } + + /** + * Cleanup overdue joinInProgress, in case a joining node never + * became member, for some reason. */ def removeOverdueJoinInProgress(): Unit = { - joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue ⇒ address } + val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } + if (overdue.nonEmpty) { + log.info("Overdue join in progress [{}]", overdue.mkString(", ")) + ending ++= overdue.map(_ -> 0) + joinInProgress --= overdue + } } } /** - * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. + * INTERNAL API + */ +private[cluster] object ClusterHeartbeatSenderConnection { + import ClusterHeartbeatReceiver._ + + /** + * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send + * [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node. + * Local only, no need to serialize. + */ + case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + + /** + * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send + * [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node. + * Local only, no need to serialize. + */ + case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address) +} + +/** + * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] + * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address. * * Netty blocks when sending to broken connections, and this actor uses * a configurable circuit breaker to reduce connect attempts to broken @@ -163,10 +265,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg * * @see ClusterHeartbeatSender */ -private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) +private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) extends Actor with ActorLogging { - import ClusterHeartbeatSender._ + import ClusterHeartbeatSenderConnection._ val breaker = { val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings @@ -177,21 +279,19 @@ private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) } - // make sure it will cleanup when not used any more - context.setReceiveTimeout(30 seconds) - def receive = { case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ if (!deadline.isOverdue) { + log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) // the CircuitBreaker will measure elapsed time and open if too many long calls try breaker.withSyncCircuitBreaker { - log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) toRef ! heartbeatMsg - if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } } - - case ReceiveTimeout ⇒ context.stop(self) // cleanup when not used - + if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) + case SendEndHeartbeat(endHeartbeatMsg, _) ⇒ + log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef) + toRef ! endHeartbeatMsg } + } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 6110df034a..38a3f4554c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -24,6 +24,9 @@ class ClusterSettings(val config: Config, val systemName: String) { final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration + final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt + final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index e5c72e642b..5f1edfc6db 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -42,7 +42,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { gossip-interval = 500 ms auto-join = off auto-down = on - failure-detector.acceptable-heartbeat-pause = 10s + failure-detector.acceptable-heartbeat-pause = 5s publish-stats-interval = 0 s # always, when it happens } akka.event-handlers = ["akka.testkit.TestEventListener"] @@ -57,7 +57,9 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.scheduler.tick-duration = 33 ms akka.remote.log-remote-lifecycle-events = off akka.remote.netty.execution-pool-size = 4 - #akka.remote.netty.reconnection-time-window = 1s + #akka.remote.netty.reconnection-time-window = 10s + akka.remote.netty.read-timeout = 5s + akka.remote.netty.write-timeout = 5s akka.remote.netty.backoff-timeout = 500ms akka.remote.netty.connection-timeout = 500ms diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index be5ae74e4d..d83753fb00 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -29,6 +29,8 @@ class ClusterConfigSpec extends AkkaSpec { PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) + NumberOfEndHeartbeats must be(4) + MonitoredByNrOfMembers must be(5) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) PublishStatsInterval must be(10 second) From c63234ca4cbcf63e580ca041d2becd99e834792b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Mon, 8 Oct 2012 10:52:55 +0200 Subject: [PATCH 05/23] Changes according to review. #2513 --- ...estBase.java => SchedulerPatternTest.java} | 70 ++++++++++--------- .../docs/pattern/SchedulerPatternTest.scala | 9 --- akka-docs/rst/java/howto.rst | 9 ++- .../docs/pattern/SchedulerPatternSpec.scala | 7 +- akka-docs/rst/scala/howto.rst | 5 +- 5 files changed, 50 insertions(+), 50 deletions(-) rename akka-docs/rst/java/code/docs/pattern/{SchedulerPatternTestBase.java => SchedulerPatternTest.java} (76%) delete mode 100644 akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java similarity index 76% rename from akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java rename to akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java index b2543bfb19..05546232aa 100644 --- a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java @@ -8,26 +8,23 @@ import akka.actor.*; import akka.testkit.*; import akka.testkit.TestEvent.Mute; import akka.testkit.TestEvent.UnMute; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import scala.concurrent.util.Duration; import scala.concurrent.util.FiniteDuration; import java.util.Arrays; import java.util.concurrent.TimeUnit; -public class SchedulerPatternTestBase { +public class SchedulerPatternTest { - ActorSystem system; + static ActorSystem system; - @Before - public void setUp() { + @BeforeClass + public static void setUp() { system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf()); } - @After - public void tearDown() { + @AfterClass + public static void tearDown() { system.shutdown(); } @@ -38,9 +35,10 @@ public class SchedulerPatternTestBase { private final Cancellable tick = getContext().system().scheduler().schedule( Duration.create(500, TimeUnit.MILLISECONDS), Duration.create(1000, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().system().dispatcher()); + getSelf(), "tick", getContext().dispatcher()); //#schedule-constructor - ActorRef target; + // this variable and constructor is declared here to not show up in the docs + final ActorRef target; public ScheduleInConstructor(ActorRef target) { this.target = target; } @@ -75,7 +73,8 @@ public class SchedulerPatternTestBase { //#schedule-receive public class ScheduleInReceive extends UntypedActor { //#schedule-receive - ActorRef target; + // this variable and constructor is declared here to not show up in the docs + final ActorRef target; public ScheduleInReceive(ActorRef target) { this.target = target; } @@ -85,7 +84,7 @@ public class SchedulerPatternTestBase { public void preStart() { getContext().system().scheduler().scheduleOnce( Duration.create(500, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().system().dispatcher()); + getSelf(), "tick", getContext().dispatcher()); } // override postRestart so we don't call preStart and schedule a new message @@ -99,7 +98,7 @@ public class SchedulerPatternTestBase { // send another periodic tick after the specified delay getContext().system().scheduler().scheduleOnce( Duration.create(1000, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().system().dispatcher()); + getSelf(), "tick", getContext().dispatcher()); // do something useful here //#schedule-receive target.tell(message, getSelf()); @@ -164,26 +163,29 @@ public class SchedulerPatternTestBase { Iterable filter = Arrays.asList(new akka.testkit.EventFilter[]{ (akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)}); - system.eventStream().publish(new Mute(filter)); + try { + system.eventStream().publish(new Mute(filter)); - final ActorRef actor = system.actorOf(props); - new Within(startDuration) { - protected void run() { - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - } - }; - actor.tell("restart", getRef()); - new Within(afterRestartDuration) { - protected void run() { - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - } - }; - system.stop(actor); - - system.eventStream().publish(new UnMute(filter)); + final ActorRef actor = system.actorOf(props); + new Within(startDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + actor.tell("restart", getRef()); + new Within(afterRestartDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + system.stop(actor); + } + finally { + system.eventStream().publish(new UnMute(filter)); + } } } } diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala deleted file mode 100644 index d450bbc090..0000000000 --- a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala +++ /dev/null @@ -1,9 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package docs.pattern - -import org.scalatest.junit.JUnitSuite - -class SchedulerPatternTest extends SchedulerPatternTestBase with JUnitSuite diff --git a/akka-docs/rst/java/howto.rst b/akka-docs/rst/java/howto.rst index e1f9f1610b..922d318c75 100644 --- a/akka-docs/rst/java/howto.rst +++ b/akka-docs/rst/java/howto.rst @@ -29,9 +29,12 @@ message sends to the same actor. .. note:: - With this approach the scheduler will be restarted with the actor on restarts. + With this approach the scheduled periodic message send will be restarted with the actor on restarts. + This also means that the time period that elapses between two tick messages during a restart may drift + off based on when you restart the scheduled message sends relative to the time that the last message was + sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``. -.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-constructor +.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-constructor The second variant sets up an initial one shot message send in the ``preStart`` method of the actor, and the then the actor when it receives this message sets up a new one shot @@ -43,7 +46,7 @@ and schedule the initial message send again. With this approach we won't fill up the mailbox with tick messages if the actor is under pressure, but only schedule a new tick message when we have seen the previous one. -.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-receive +.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-receive Template Pattern ================ diff --git a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala index e79d94b2c5..a669cb0bc5 100644 --- a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala +++ b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala @@ -15,10 +15,10 @@ import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor object SchedulerPatternSpec { //#schedule-constructor class ScheduleInConstructor extends Actor { - import context._ - - val tick = system.scheduler.schedule(500 millis, 1000 millis, self, "tick") + val tick = + context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick") //#schedule-constructor + // this var and constructor is declared here to not show up in the docs var target: ActorRef = null def this(target: ActorRef) = { this(); this.target = target } //#schedule-constructor @@ -41,6 +41,7 @@ object SchedulerPatternSpec { class ScheduleInReceive extends Actor { import context._ //#schedule-receive + // this var and constructor is declared here to not show up in the docs var target: ActorRef = null def this(target: ActorRef) = { this(); this.target = target } //#schedule-receive diff --git a/akka-docs/rst/scala/howto.rst b/akka-docs/rst/scala/howto.rst index c5203adb1c..dcdebe06db 100644 --- a/akka-docs/rst/scala/howto.rst +++ b/akka-docs/rst/scala/howto.rst @@ -123,7 +123,10 @@ message sends to the same actor. .. note:: - With this approach the scheduler will be restarted with the actor on restarts. + With this approach the scheduled periodic message send will be restarted with the actor on restarts. + This also means that the time period that elapses between two tick messages during a restart may drift + off based on when you restart the scheduled message sends relative to the time that the last message was + sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``. .. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor From 59f8210b85c4a5f2eaf88d1bf387ccdbb1cdc554 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 9 Oct 2012 17:54:54 +0200 Subject: [PATCH 06/23] Incorporate review comments, see #2284 --- .../scala/akka/cluster/ClusterHeartbeat.scala | 14 ++++---- .../scala/akka/cluster/ClusterSettings.scala | 35 ++++++++++++++----- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 4e80223dbf..232a890910 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -103,9 +103,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg self ! HeartbeatTick } - override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) - } + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def postStop(): Unit = { heartbeatTask.cancel() @@ -131,6 +129,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg all = state.members.collect { case m if m.address != selfAddress ⇒ m.address } joinInProgress --= all consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor) + update() } def addMember(m: Member): Unit = if (m.address != selfAddress) { @@ -259,11 +258,11 @@ private[cluster] object ClusterHeartbeatSenderConnection { * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address. * - * Netty blocks when sending to broken connections, and this actor uses - * a configurable circuit breaker to reduce connect attempts to broken + * This actor exists only because Netty blocks when sending to broken connections, + * and this actor uses a configurable circuit breaker to reduce connect attempts to broken * connections. * - * @see ClusterHeartbeatSender + * @see akka.cluster.ClusterHeartbeatSender */ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) extends Actor with ActorLogging { @@ -283,7 +282,8 @@ private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ if (!deadline.isOverdue) { log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) - // the CircuitBreaker will measure elapsed time and open if too many long calls + // Netty blocks when sending to broken connections, the CircuitBreaker will + // measure elapsed time and open if too many long calls try breaker.withSyncCircuitBreaker { toRef ! heartbeatMsg } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 38a3f4554c..fa35bc25a8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -16,17 +16,34 @@ import scala.concurrent.util.FiniteDuration class ClusterSettings(val config: Config, val systemName: String) { import config._ - final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold") - final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class") - final val FailureDetectorMinStdDeviation: FiniteDuration = - Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) - final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = - Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + final val FailureDetectorThreshold: Double = { + val x = getDouble("akka.cluster.failure-detector.threshold") + require(x > 0.0, "failure-detector.threshold must be > 0") + x + } + final val FailureDetectorMaxSampleSize: Int = { + val n = getInt("akka.cluster.failure-detector.max-sample-size") + require(n > 0, "failure-detector.max-sample-size must be > 0"); n + } + final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class") + final val FailureDetectorMinStdDeviation: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.min-std-deviation must be > 0"); d + } + final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) + require(d >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0"); d + } + final val HeartbeatInterval: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d + } final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt - final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") + final val MonitoredByNrOfMembers: Int = { + val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") + require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n + } final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr From 66c81e915e818598c1b564b120fb2f6f2f990240 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Oct 2012 15:23:18 +0200 Subject: [PATCH 07/23] Move state of ClusterHeartbeatSender to separate immutable class, see #2284 --- .../scala/akka/routing/ConsistentHash.scala | 2 +- .../scala/akka/cluster/ClusterHeartbeat.scala | 206 +++++++++++------- .../ClusterHeartbeatSenderStateSpec.scala | 88 ++++++++ 3 files changed, 221 insertions(+), 75 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 79c31cda33..d3bef92e6c 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -18,7 +18,7 @@ import java.util.Arrays * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) { +class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) { import ConsistentHash._ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 561f3c85ba..325e0aae25 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -89,13 +89,9 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg val selfHeartbeat = Heartbeat(selfAddress) val selfEndHeartbeat = EndHeartbeat(selfAddress) - val selfAddressStr = selfAddress.toString - var all = Set.empty[Address] - var current = Set.empty[Address] - var ending = Map.empty[Address, Int] - var joinInProgress = Map.empty[Address, Deadline] - var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor) + var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor), + selfAddress.toString, MonitoredByNrOfMembers) // start periodic heartbeat to other nodes in cluster val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], @@ -115,47 +111,31 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") def receive = { - case HeartbeatTick ⇒ heartbeat() - case state: CurrentClusterState ⇒ init(state) - case MemberUnreachable(m) ⇒ removeMember(m) - case MemberRemoved(m) ⇒ removeMember(m) - case e: MemberEvent ⇒ addMember(e.member) - case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) + case HeartbeatTick ⇒ heartbeat() + case s: CurrentClusterState ⇒ reset(s) + case MemberUnreachable(m) ⇒ removeMember(m) + case MemberRemoved(m) ⇒ removeMember(m) + case e: MemberEvent ⇒ addMember(e.member) + case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) } - def init(state: CurrentClusterState): Unit = { - all = state.members.collect { case m if m.address != selfAddress ⇒ m.address } - joinInProgress --= all - consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor) - update() - } + def reset(snapshot: CurrentClusterState): Unit = + state = state.reset(snapshot.members.collect { case m if m.address != selfAddress ⇒ m.address }) - def addMember(m: Member): Unit = if (m.address != selfAddress) { - all += m.address - consistentHash = consistentHash :+ m.address - removeJoinInProgress(m.address) - update() - } + def addMember(m: Member): Unit = if (m.address != selfAddress) + state = state addMember m.address - def removeMember(m: Member): Unit = if (m.address != selfAddress) { - all -= m.address - consistentHash = consistentHash :- m.address - removeJoinInProgress(m.address) - update() - } + def removeMember(m: Member): Unit = if (m.address != selfAddress) + state = state removeMember m.address - def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) { - joinInProgress -= address - ending += (address -> 0) - } + def removeJoinInProgress(address: Address): Unit = if (address != selfAddress) + state = state.removeJoinInProgress(address) - def addJoinInProgress(address: Address, deadline: Deadline): Unit = { - if (address != selfAddress && !all.contains(address)) - joinInProgress += (address -> deadline) - } + def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress) + state = state.addJoinInProgress(address, deadline) def heartbeat(): Unit = { - removeOverdueJoinInProgress() + state = state.removeOverdueJoinInProgress() def connection(to: Address): ActorRef = { // URL encoded target address as child actor name @@ -168,67 +148,145 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg } val deadline = Deadline.now + HeartbeatInterval - (current ++ joinInProgress.keys) foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } + state.active foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is // sent to notify it that no more heartbeats will be sent. - for ((to, count) ← ending) { + for ((to, count) ← state.ending) { val c = connection(to) c ! SendEndHeartbeat(selfEndHeartbeat, to) if (count == NumberOfEndHeartbeats) { - ending -= to + state = state.removeEnding(to) c ! PoisonPill - } else { - ending += (to -> (count + 1)) - } + } else + state = state.increaseEndingCount(to) } } +} + +/** + * INTERNAL API + */ +private[cluster] object ClusterHeartbeatSenderState { /** - * Update current peers to send heartbeats to, and + * Initial, empty state + */ + def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String, + monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers) + + /** + * Create a new state based on previous state, and * keep track of which nodes to stop sending heartbeats to. */ - def update(): Unit = { - val previous = current - current = selectPeers + private def apply( + old: ClusterHeartbeatSenderState, + consistentHash: ConsistentHash[Address], + all: Set[Address]): ClusterHeartbeatSenderState = { + + /** + * Select a few peers that heartbeats will be sent to, i.e. that will + * monitor this node. Try to send heartbeats to same nodes as much + * as possible, but re-balance with consistent hashing algorithm when + * new members are added or removed. + */ + def selectPeers: Set[Address] = { + val allSize = all.size + val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers) + // try more if consistentHash results in same node as already selected + val attemptLimit = nrOfPeers * 2 + @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { + if (acc.size == nrOfPeers || n == attemptLimit) acc + else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1) + } + if (nrOfPeers >= allSize) all + else select(Set.empty[Address], 0) + } + + val curr = selectPeers // start ending process for nodes not selected any more - ending ++= (previous -- current).map(_ -> 0) // abort ending process for nodes that have been selected again - ending --= current + val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr + old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end, + joinInProgress = old.joinInProgress -- all) } - /** - * Select a few peers that heartbeats will be sent to, i.e. that will - * monitor this node. Try to send heartbeats to same nodes as much - * as possible, but re-balance with consistent hashing algorithm when - * new members are added or removed. - */ - def selectPeers: Set[Address] = { - val allSize = all.size - val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers) - // try more if consistentHash results in same node as already selected - val attemptLimit = nrOfPeers * 2 - @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { - if (acc.size == nrOfPeers || n == attemptLimit) acc - else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1) - } - if (nrOfPeers >= allSize) all - else select(Set.empty[Address], 0) +} + +/** + * INTERNAL API + * + * State used by [akka.cluster.ClusterHeartbeatSender]. + * The initial state is created with `empty` in the of + * the companion object, thereafter the state is modified + * with the methods, such as `addMember`. It is immutable, + * i.e. the methods return new instances. + */ +private[cluster] case class ClusterHeartbeatSenderState private ( + consistentHash: ConsistentHash[Address], + selfAddressStr: String, + monitoredByNrOfMembers: Int, + all: Set[Address] = Set.empty, + current: Set[Address] = Set.empty, + ending: Map[Address, Int] = Map.empty, + joinInProgress: Map[Address, Deadline] = Map.empty) { + + // FIXME can be disabled as optimization + assertInvariants + + private def assertInvariants: Unit = { + val currentAndEnding = current.intersect(ending.keySet) + require(currentAndEnding.isEmpty, + "Same nodes in current and ending not allowed, got [%s]" format currentAndEnding) + val joinInProgressAndAll = joinInProgress.keySet.intersect(all) + require(joinInProgressAndAll.isEmpty, + "Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll) + val currentNotInAll = current -- all + require(currentNotInAll.isEmpty, + "Nodes in current but not in all not allowed, got [%s]" format currentNotInAll) + require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]" + format all) + } + + val active: Set[Address] = current ++ joinInProgress.keySet + + def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), + all = nodes) + + def addMember(a: Address): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a) + + def removeMember(a: Address): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a) + + def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { + if (joinInProgress contains address) + copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0)) + else this + } + + def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { + if (all contains address) this + else copy(joinInProgress = joinInProgress + (address -> deadline)) } /** * Cleanup overdue joinInProgress, in case a joining node never * became member, for some reason. */ - def removeOverdueJoinInProgress(): Unit = { + def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = { val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } - if (overdue.nonEmpty) { - log.info("Overdue join in progress [{}]", overdue.mkString(", ")) - ending ++= overdue.map(_ -> 0) - joinInProgress --= overdue - } + if (overdue.isEmpty) this + else + copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue) } + def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a) + + def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1))) + } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala new file mode 100644 index 0000000000..bd378ed0fe --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address +import akka.routing.ConsistentHash +import scala.concurrent.util.Deadline +import scala.concurrent.util.duration._ + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { + + val selfAddress = Address("akka", "sys", "myself", 2552) + val aa = Address("akka", "sys", "aa", 2552) + val bb = Address("akka", "sys", "bb", 2552) + val cc = Address("akka", "sys", "cc", 2552) + val dd = Address("akka", "sys", "dd", 2552) + val ee = Address("akka", "sys", "ee", 2552) + + val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10), + selfAddress.toString, 3) + + "A ClusterHeartbeatSenderState" must { + + "return empty active set when no nodes" in { + emptyState.active.isEmpty must be(true) + } + + "include joinInProgress in active set" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds) + s.joinInProgress.keySet must be(Set(aa)) + s.active must be(Set(aa)) + } + + "remove joinInProgress from active set after removeOverdueJoinInProgress" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress() + s.joinInProgress must be(Map.empty) + s.active must be(Set.empty) + } + + "remove joinInProgress after reset" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb)) + s.joinInProgress must be(Map.empty) + } + + "include nodes from reset in active set" in { + val nodes = Set(aa, bb, cc) + val s = emptyState.reset(nodes) + s.all must be(nodes) + s.current must be(nodes) + s.ending must be(Map.empty) + s.active must be(nodes) + } + + "limit current nodes to monitoredByNrOfMembers when adding members" in { + val nodes = Set(aa, bb, cc, dd) + val s = nodes.foldLeft(emptyState) { _ addMember _ } + s.all must be(nodes) + s.current.size must be(3) + s.addMember(ee).current.size must be(3) + } + + "move meber to ending set when removing member" in { + val nodes = Set(aa, bb, cc, dd, ee) + val s = emptyState.reset(nodes) + s.ending must be(Map.empty) + val included = s.current.head + val s2 = s.removeMember(included) + s2.ending must be(Map(included -> 0)) + s2.current must not contain (included) + val s3 = s2.addMember(included) + s3.current must contain(included) + s3.ending.keySet must not contain (included) + } + + "increase ending count correctly" in { + val s = emptyState.reset(Set(aa)).removeMember(aa) + s.ending must be(Map(aa -> 0)) + val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa) + s2.ending must be(Map(aa -> 2)) + } + + } +} From 24c115547d43c2aa142324214c308480a7c7043b Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Wed, 10 Oct 2012 09:58:18 -0600 Subject: [PATCH 08/23] 2609 Typo in method name for ClusterRouteeProvider --- .../main/scala/akka/cluster/routing/ClusterRouterConfig.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 52a9a55e21..eaaaa2c4a5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -196,7 +196,7 @@ private[akka] class ClusterRouteeProvider( private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees - val currentNodes = availbleNodes + val currentNodes = availableNodes if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { None } else { @@ -222,7 +222,7 @@ private[akka] class ClusterRouteeProvider( case a ⇒ a } - private[routing] def availbleNodes: SortedSet[Address] = { + private[routing] def availableNodes: SortedSet[Address] = { import Member.addressOrdering val currentNodes = nodes if (currentNodes.isEmpty && settings.allowLocalRoutees) From 61481da6cd57cdd794094a4b8f89c287dcee0286 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Wed, 10 Oct 2012 10:05:41 -0600 Subject: [PATCH 09/23] 2609 Fixed more 'available' typos in ClusterRouteeProvider --- .../scala/akka/cluster/routing/ClusterRouterConfig.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index eaaaa2c4a5..bdbb1297fb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -236,11 +236,11 @@ private[akka] class ClusterRouteeProvider( private[routing] var nodes: SortedSet[Address] = { import Member.addressOrdering cluster.readView.members.collect { - case m if isAvailble(m) ⇒ m.address + case m if isAvailable(m) ⇒ m.address } } - private[routing] def isAvailble(m: Member): Boolean = { + private[routing] def isAvailable(m: Member): Boolean = { m.status == MemberStatus.Up && (settings.allowLocalRoutees || m.address != cluster.selfAddress) } @@ -271,10 +271,10 @@ private[akka] class ClusterRouterActor extends Router { override def routerReceive: Receive = { case s: CurrentClusterState ⇒ import Member.addressOrdering - routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailble(m) ⇒ m.address } + routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailable(m) ⇒ m.address } routeeProvider.createRoutees() - case m: MemberEvent if routeeProvider.isAvailble(m.member) ⇒ + case m: MemberEvent if routeeProvider.isAvailable(m.member) ⇒ routeeProvider.nodes += m.member.address // createRoutees will create routees based on // totalInstances and maxInstancesPerNode From 279fd2b6ef4a5d8aef16f7f0dd2d188b9b378a1c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Oct 2012 18:13:08 +0200 Subject: [PATCH 10/23] Fix bug introduced in refactoring, see #2284 --- .../scala/akka/cluster/ClusterHeartbeat.scala | 17 ++++++-------- .../ClusterHeartbeatSenderStateSpec.scala | 22 ++++++++++++++++++- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 325e0aae25..fef88ece20 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -128,9 +128,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def removeMember(m: Member): Unit = if (m.address != selfAddress) state = state removeMember m.address - def removeJoinInProgress(address: Address): Unit = if (address != selfAddress) - state = state.removeJoinInProgress(address) - def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress) state = state.addJoinInProgress(address, deadline) @@ -208,8 +205,7 @@ private[cluster] object ClusterHeartbeatSenderState { // start ending process for nodes not selected any more // abort ending process for nodes that have been selected again val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr - old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end, - joinInProgress = old.joinInProgress -- all) + old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end) } } @@ -252,16 +248,17 @@ private[cluster] case class ClusterHeartbeatSenderState private ( val active: Set[Address] = current ++ joinInProgress.keySet def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), + ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ }, + consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), all = nodes) def addMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a) + ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a) def removeMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a) + ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a) - def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { + private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { if (joinInProgress contains address) copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0)) else this @@ -269,7 +266,7 @@ private[cluster] case class ClusterHeartbeatSenderState private ( def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { if (all contains address) this - else copy(joinInProgress = joinInProgress + (address -> deadline)) + else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address) } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index bd378ed0fe..3850524c24 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -40,13 +40,33 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress() s.joinInProgress must be(Map.empty) s.active must be(Set.empty) + s.ending must be(Map(aa -> 0)) } "remove joinInProgress after reset" in { - val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb)) + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)) s.joinInProgress must be(Map.empty) } + "remove joinInProgress after addMember" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa) + s.joinInProgress must be(Map.empty) + } + + "remove joinInProgress after removeMember" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa) + s.joinInProgress must be(Map.empty) + s.ending must be(Map(aa -> 0)) + } + + "remove from ending after addJoinInProgress" in { + val s = emptyState.reset(Set(aa, bb)).removeMember(aa) + s.ending must be(Map(aa -> 0)) + val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds) + s2.joinInProgress.keySet must be(Set(aa)) + s2.ending must be(Map.empty) + } + "include nodes from reset in active set" in { val nodes = Set(aa, bb, cc) val s = emptyState.reset(nodes) From b965011a132418280a0b5db462c6c998928a0a95 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 11 Oct 2012 09:26:09 +0200 Subject: [PATCH 11/23] config lib 0.6.0 --- project/AkkaBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index ff1085c32e..00ea3b26a8 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -696,7 +696,7 @@ object Dependencies { object Dependency { // Compile val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 - val config = "com.typesafe" % "config" % "0.5.2" // ApacheV2 + val config = "com.typesafe" % "config" % "0.6.0" // ApacheV2 val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) From ef695cbb29b89271b5c7520a53d4e376e6220368 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 12 Oct 2012 13:00:16 +0200 Subject: [PATCH 12/23] LogRoleReplace filter out ESC characters --- .../src/test/scala/akka/remote/testkit/LogRoleReplace.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala index 51a189b7f9..6905b9b116 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala @@ -93,7 +93,7 @@ object LogRoleReplace extends ClipboardOwner { class LogRoleReplace { private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started with address \[akka://.*@([\w\-\.]+):([0-9]+)\]""".r - private val ColorCode = """\[[0-9]+m""" + private val ColorCode = """\u001B?\[[0-9]+m""" private var replacements: Map[String, String] = Map.empty From 91f6c5a94dad08a481ffaea24d3ba55b7edec1ef Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 12 Oct 2012 13:27:31 +0200 Subject: [PATCH 13/23] Adjust barriers/checks in LeaderElectionSpec, see #2583 * Previously it didn't check for unreachable, before down --- .../akka/cluster/LeaderElectionSpec.scala | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index b16102e398..573121272b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -4,10 +4,12 @@ package akka.cluster +import language.postfixOps import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ +import scala.concurrent.util.duration._ case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val controller = role("controller") @@ -69,7 +71,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig val leaderAddress = address(leader) enterBarrier("before-shutdown" + n) testConductor.shutdown(leader, 0) - enterBarrier("after-shutdown" + n, "after-down" + n, "completed" + n) + enterBarrier("after-shutdown" + n, "after-unavailable" + n, "after-down" + n, "completed" + n) case `leader` ⇒ enterBarrier("before-shutdown" + n, "after-shutdown" + n) @@ -78,15 +80,25 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig case `aUser` ⇒ val leaderAddress = address(leader) enterBarrier("before-shutdown" + n, "after-shutdown" + n) + + // detect failure + markNodeAsUnavailable(leaderAddress) + awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.address == leaderAddress)) + enterBarrier("after-unavailable" + n) + // user marks the shutdown leader as DOWN cluster.down(leaderAddress) enterBarrier("after-down" + n, "completed" + n) - markNodeAsUnavailable(leaderAddress) case _ if remainingRoles.contains(myself) ⇒ // remaining cluster nodes, not shutdown - enterBarrier("before-shutdown" + n, "after-shutdown" + n, "after-down" + n) + val leaderAddress = address(leader) + enterBarrier("before-shutdown" + n, "after-shutdown" + n) + awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.address == leaderAddress)) + enterBarrier("after-unavailable" + n) + + enterBarrier("after-down" + n) awaitUpConvergence(currentRoles.size - 1) val nextExpectedLeader = remainingRoles.head clusterView.isLeader must be(myself == nextExpectedLeader) @@ -97,12 +109,12 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig } } - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in within(20 seconds) { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) enterBarrier("after-2") } - "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in { + "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in within(20 seconds) { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) enterBarrier("after-3") } From ec438ca2a6183289a0e61b947842bc50e18ba4a3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 12 Oct 2012 14:15:00 +0200 Subject: [PATCH 14/23] Logback and slf4j dependencies, see #2615 * Latest slf4j 1.6.6 * Latest logback-classic 1.0.7 * Removed runtime scope for logback in docs --- akka-docs/rst/java/logging.rst | 3 +-- akka-docs/rst/scala/logging.rst | 2 +- project/AkkaBuild.scala | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/akka-docs/rst/java/logging.rst b/akka-docs/rst/java/logging.rst index eefce2b35d..0f857837c5 100644 --- a/akka-docs/rst/java/logging.rst +++ b/akka-docs/rst/java/logging.rst @@ -194,8 +194,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4 ch.qos.logback logback-classic - 1.0.4 - runtime + 1.0.7 You need to enable the Slf4jEventHandler in the 'event-handlers' element in diff --git a/akka-docs/rst/scala/logging.rst b/akka-docs/rst/scala/logging.rst index 60cd3f2a61..f8c3e11f27 100644 --- a/akka-docs/rst/scala/logging.rst +++ b/akka-docs/rst/scala/logging.rst @@ -232,7 +232,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4 .. code-block:: scala - lazy val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "runtime" + lazy val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" You need to enable the Slf4jEventHandler in the 'event-handlers' element in diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 00ea3b26a8..23698d3b1a 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -701,7 +701,7 @@ object Dependency { val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) - val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT + val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.6" // MIT val zeroMQClient = "org.zeromq" % "zeromq-scala-binding" % "0.0.6" cross CrossVersion.full // ApacheV2 val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2 @@ -717,7 +717,7 @@ object Dependency { val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2 val commonsIo = "commons-io" % "commons-io" % "2.0.1" % "test" // ApacheV2 val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0 - val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "test" // EPL 1.0 / LGPL 2.1 + val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" % "test" // EPL 1.0 / LGPL 2.1 val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT val scalatest = "org.scalatest" % "scalatest" % "1.9-2.10.0-M7-B1" % "test" cross CrossVersion.full // ApacheV2 val scalacheck = "org.scalacheck" % "scalacheck" % "1.10.0" % "test" cross CrossVersion.full // New BSD From 76c158e63c5b21525c0820571e27cf6fa5b4cc51 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 12 Oct 2012 18:46:20 +0200 Subject: [PATCH 15/23] slf4j 1.7.2, see #2615 --- project/AkkaBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 23698d3b1a..7d0789aaae 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -701,7 +701,7 @@ object Dependency { val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) - val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.6" // MIT + val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.2" // MIT val zeroMQClient = "org.zeromq" % "zeromq-scala-binding" % "0.0.6" cross CrossVersion.full // ApacheV2 val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2 From 0ab417b2c1fe1d4b78827d9ddd7d33935722e310 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 14 Oct 2012 03:35:09 +0200 Subject: [PATCH 16/23] #2612 - Clarifying ReceiveTimeout semantics in the documentation. --- .../actor/MyReceivedTimeoutUntypedActor.java | 4 ++++ akka-docs/rst/java/untyped-actors.rst | 19 +++++++++++++------ akka-docs/rst/scala/actors.rst | 19 +++++++++++++------ .../scala/code/docs/actor/ActorDocSpec.scala | 11 +++++++++-- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java b/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java index b1fb899be7..66c0bd4b20 100644 --- a/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java +++ b/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java @@ -11,13 +11,17 @@ import scala.concurrent.util.Duration; public class MyReceivedTimeoutUntypedActor extends UntypedActor { public MyReceivedTimeoutUntypedActor() { + // To set an initial delay getContext().setReceiveTimeout(Duration.parse("30 seconds")); } public void onReceive(Object message) { if (message.equals("Hello")) { + // To set in a response to a message + getContext().setReceiveTimeout(Duration.parse("30 seconds")); getSender().tell("Hello world", getSelf()); } else if (message == ReceiveTimeout.getInstance()) { + // To turn it off throw new RuntimeException("received timeout"); } else { unhandled(message); diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index 685a0903d5..2ee8bc397f 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -431,13 +431,20 @@ defaults to a 'dead-letter' actor ref. getSender().tell(result); // will have dead-letter actor as default } -Initial receive timeout -======================= +Receive timeout +=============== -A timeout mechanism can be used to receive a message when no initial message is -received within a certain time. To receive this timeout you have to set the -``receiveTimeout`` property and declare handing for the ReceiveTimeout -message. +The `UntypedActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which +the sending of a `ReceiveTimeout` message is triggered. +When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message. +1 millisecond is the minimum supported timeout. + +Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after +another message was enqueued; hence it is **not guaranteed** that upon reception of the receive +timeout there must have been an idle period beforehand as configured via this method. + +Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity +periods). Pass in `Duration.Undefined` to switch off this feature. .. includecode:: code/docs/actor/MyReceivedTimeoutUntypedActor.java#receive-timeout diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index fea94dec0d..3000a2e55a 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -549,13 +549,20 @@ defaults to a 'dead-letter' actor ref. val result = process(request) sender ! result // will have dead-letter actor as default -Initial receive timeout -======================= +Receive timeout +=============== -A timeout mechanism can be used to receive a message when no initial message is -received within a certain time. To receive this timeout you have to set the -``receiveTimeout`` property and declare a case handing the ReceiveTimeout -object. +The `ActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which +the sending of a `ReceiveTimeout` message is triggered. +When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message. +1 millisecond is the minimum supported timeout. + +Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after +another message was enqueued; hence it is **not guaranteed** that upon reception of the receive +timeout there must have been an idle period beforehand as configured via this method. + +Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity +periods). Pass in `Duration.Undefined` to switch off this feature. .. includecode:: code/docs/actor/ActorDocSpec.scala#receive-timeout diff --git a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala index 0cd43bdd7e..291c02bd85 100644 --- a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala @@ -269,11 +269,18 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#receive-timeout import akka.actor.ReceiveTimeout import scala.concurrent.util.duration._ + import scala.concurrent.util.Duration class MyActor extends Actor { + // To set an initial delay context.setReceiveTimeout(30 milliseconds) def receive = { - case "Hello" ⇒ //... - case ReceiveTimeout ⇒ throw new RuntimeException("received timeout") + case "Hello" ⇒ + // To set in a response to a message + context.setReceiveTimeout(30 milliseconds) + case ReceiveTimeout ⇒ + // To turn it off + context.setReceiveTimeout(Duration.Undefined) + throw new RuntimeException("Receive timed out") } } //#receive-timeout From c813178373cacffe3b28fc82935adab715fb908a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 14 Oct 2012 03:40:10 +0200 Subject: [PATCH 17/23] #2613 - Harmonized the Akka CONTRIBUTING.md with the Typesafe Developer Guidelines --- CONTRIBUTING.md | 116 +++++++++++++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 41 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fc929aca31..7b2b0af92d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,57 +1,91 @@ #Contributing to Akka# -Greetings traveller! - ##Infrastructure## * [Akka Contributor License Agreement](http://www.typesafe.com/contribute/cla) * [Akka Issue Tracker](http://doc.akka.io/docs/akka/current/project/issue-tracking.html) * [Scalariform](https://github.com/mdr/scalariform) -##Workflow## +# Typesafe Project & Developer Guidelines -0. Sign the Akka Contributor License Agreement, - we won't accept anything from anybody who has not signed it. -1. Find-or-create a ticket in the issue tracker -2. Assign that ticket to yourself -3. Create a local branch with the following name format: wip-X-Y-Z - where the X is the number of the ticket in the tracker, - and Y is some brief keywords of the ticket title and Z is your initials or similar. - Example: wip-2373-add-contributing-md-√ -4. Do what needs to be done (with tests and docs if applicable). - Your branch should pass all tests before going any further. -5. Push the branch to your clone of the Akka repository -6. Create a Pull Request onto the applicable Akka branch, - if the number of commits are more than a few, please squash the - commits first. -7. Change the status of your ticket to "Test" -8. The Pull Request will be reviewed by the Akka committers -9. Modify the Pull Request as agreed upon during the review, - then push the changes to your branch in your Akka repository, - the Pull Request should be automatically updated with the new - content. -10. Several cycles of review-then-change might occur. -11. Pull Request is either merged by the Akka committers, - or rejected, and the associated ticket will be updated to - reflect that. -12. Delete the local and remote wip-X-Y-Z +These guidelines are meant to be a living document that should be changed and adapted as needed. We encourage changes that makes it easier to achieve our goals in an efficient way. -##Code Reviews## +These guidelines mainly applies to Typesafe’s “mature” projects - not necessarily to projects of the type ‘collection of scripts’ etc. -Akka utilizes peer code reviews to streamline the codebase, reduce the defect ratio, -increase maintainability and spread knowledge about how things are solved. +## General Workflow -Core review values: +This is the process for committing code into master. There are of course exceptions to these rules, for example minor changes to comments and documentation, fixing a broken build etc. -* Rule: [The Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule) - - Why: Small improvements add up over time, keeping the codebase in shape. -* Rule: [Don't Repeat Yourself](http://programmer.97things.oreilly.com/wiki/index.php/Don't_Repeat_Yourself) - - Why: Repetitions are not maintainable, keeping things DRY makes it easier to fix bugs and refactor, - since you only need to apply the correction in one place, or perform the refactoring at one place. -* Rule: Feature tests > Integration tests > Unit tests - - Why: Without proving that a feature works, the code is only liability. - Without proving that a feature works with other features, the code is of limited value. - Without proving the individual parts of a feature works, the code is harder to debug. +1. Make sure you have signed the [Typesafe CLA](http://www.typesafe.com/contribute/cla), if not, sign it online. +2. Before starting to work on a feature or a fix, you have to make sure that: + 1. There is a ticket for your work in the project's issue tracker. If not, create it first. + 2. The ticket has been scheduled for the current milestone. + 3. The ticket is estimated by the team. + 4. The ticket have been discussed and prioritized by the team. +3. You should always perform your work in a Git feature branch. The branch should be given a descriptive name that explains its intent. Some teams also like adding the ticket number and/or the [GitHub](http://github.com) user ID to the branch name, these details is up to each of the individual teams. +4. When the feature or fix is completed you should open a [Pull Request](https://help.github.com/articles/using-pull-requests) on GitHub. +5. The Pull Request should be reviewed by other maintainers (as many as feasible/practical). Note that the maintainers can consist of outside contributors, both within and outside Typesafe. Outside contributors (for example from EPFL or independent committers) are encouraged to participate in the review process, it is not a closed process. +6. After the review you should fix the issues as needed (pushing a new commit for new review etc.), iterating until the reviewers give their thumbs up. +7. Once the code has passed review the Pull Request can be merged into the master branch. + +## Pull Request Requirements + +For a Pull Request to be considered at all it has to meet these requirements: + +1. Live up to the current code standard: + - Not violate [DRY](http://programmer.97things.oreilly.com/wiki/index.php/Don%27t_Repeat_Yourself). + - [Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule) needs to have been applied. +2. Regardless if the code introduces new features or fixes bugs or regressions, it must have comprehensive tests. +3. The code must be well documented in the Typesafe's standard documentation format (see the ‘Documentation’ section below). + +If these requirements are not met then the code should **not** be merged into master, or even reviewed - regardless of how good or important it is. No exceptions. + +## Continuous Integration + +Each project should be configured to use a continuous integration (CI) tool (i.e. a build server ala Jenkins). Typesafe has a Jenkins server farm that can be used. The CI tool should, on each push to master, build the **full** distribution and run **all** tests, and if something fails it should email out a notification with the failure report to the committer and the core team. The CI tool should also be used in conjunction with Typesafe’s Pull Request Validator (discussed below). + +## Documentation + +All documentation should be generated using the sbt-site-plugin, *or* publish artifacts to a repository that can be consumed by the typesafe stack. + +All documentation must abide by the following maxims: + +- Example code should be run as part of an automated test suite. +- Version should be **programmatically** specifiable to the build. +- Generation should be **completely automated** and available for scripting. +- Artifacts that must be included in the Typesafe Stack should be published to a maven “documentation” repository as documentation artifacts. + +All documentation is preferred to be in Typesafe's standard documentation format [reStructuredText](http://doc.akka.io/docs/akka/snapshot/dev/documentation.html) compiled using Typesafe's customized [Sphinx](http://sphinx.pocoo.org/) based documentation generation system, which among other things allows all code in the documentation to be externalized into compiled files and imported into the documentation. + +For more info, or for a starting point for new projects, look at the [Typesafe Documentation Sample project](https://github.com/typesafehub/doc-example). + +For larger projects that have invested a lot of time and resources into their current documentation and samples scheme (like for example Play), it is understandable that it will take some time to migrate to this new model. In these cases someone from the project needs to take the responsibility of manual QA and verifier for the documentation and samples. + +## Work In Progress + +It is ok to work on a public feature branch in the GitHub repository. Something that can sometimes be useful for early feedback etc. If so then it is preferable to name the branch accordingly. This can be done by either prefix the name with ``wip-`` as in ‘Work In Progress’, or use hierarchical names like ``wip/..``, ``feature/..`` or ``topic/..``. Either way is fine as long as it is clear that it is work in progress and not ready for merge. This work can temporarily have a lower standard. However, to be merged into master it will have to go through the regular process outlined above, with Pull Request, review etc.. + +Also, to facilitate both well-formed commits and working together, the ``wip`` and ``feature``/``topic`` identifiers also have special meaning. Any branch labelled with ``wip`` is considered “git-unstable” and may be rebased and have its history rewritten. Any branch with ``feature``/``topic`` in the name is considered “stable” enough for others to depend on when a group is working on a feature. + +## Creating Commits And Writing Commit Messages + +Follow these guidelines when creating public commits and writing commit messages. + +1. If your work spans multiple local commits (for example; if you do safe point commits while working in a feature branch or work in a branch for long time doing merges/rebases etc.) then please do not commit it all but rewrite the history by squashing the commits into a single big commit which you write a good commit message for (like discussed in the following sections). For more info read this article: [Git Workflow](http://sandofsky.com/blog/git-workflow.html). Every commit should be able to be used in isolation, cherry picked etc. +2. First line should be a descriptive sentence what the commit is doing. It should be possible to fully understand what the commit does by just reading this single line. It is **not ok** to only list the ticket number, type "minor fix" or similar. Include reference to ticket number, prefixed with #, at the end of the first line. If the commit is a small fix, then you are done. If not, go to 3. +3. Following the single line description should be a blank line followed by an enumerated list with the details of the commit. +4. Add keywords for your commit (depending on the degree of automation we reach, the list may change over time): + * ``Review by @gituser`` - if you want to notify someone on the team. The others can, and are encouraged to participate. + * ``Fix/Fixing/Fixes/Close/Closing/Refs #ticket`` - if you want to mark the ticket as fixed in the issue tracker (Assembla understands this). + * ``backport to _branch name_`` - if the fix needs to be cherry-picked to another branch (like 2.9.x, 2.10.x, etc) + +Example: + + Added monadic API to Future. Fixes #2731 + + * Details 1 + * Details 2 + * Details 3 ##Source style## From f5895ba64c3e5d76e0d40a537b2e48c81106cd26 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 14 Oct 2012 04:05:55 +0200 Subject: [PATCH 18/23] Adding SBT transient project config classes to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index f646a4c173..82c89b5692 100755 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ project/plugins/project project/boot/* */project/build/target */project/boot +*/project/project.target.config-classes lib_managed etags tags From 3a0cf84d2c8a43a71e058c5542ca584584ef09f7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 14 Oct 2012 04:43:13 +0200 Subject: [PATCH 19/23] Increasing timeouts for FutureDocTestBase to avoid Jenkins being too slow. --- .../code/docs/future/FutureDocTestBase.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java b/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java index 7b1e1f2be5..fc5c2937a1 100644 --- a/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java @@ -113,7 +113,7 @@ public class FutureDocTestBase { return "Hello" + "World"; } }, system.dispatcher()); - String result = (String) Await.result(f, Duration.create(1, SECONDS)); + String result = (String) Await.result(f, Duration.create(5, SECONDS)); //#future-eval assertEquals("HelloWorld", result); } @@ -135,7 +135,7 @@ public class FutureDocTestBase { } }, ec); - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); //#map } @@ -159,7 +159,7 @@ public class FutureDocTestBase { }, ec); //#map2 - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); } @@ -183,7 +183,7 @@ public class FutureDocTestBase { }, ec); //#map3 - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); } @@ -209,7 +209,7 @@ public class FutureDocTestBase { }, ec); //#flat-map - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); } @@ -238,7 +238,7 @@ public class FutureDocTestBase { } }, ec); - long result = Await.result(futureSum, Duration.create(1, SECONDS)); + long result = Await.result(futureSum, Duration.create(5, SECONDS)); //#sequence assertEquals(3L, result); } @@ -262,7 +262,7 @@ public class FutureDocTestBase { }, ec); //Returns the sequence of strings as upper case - Iterable result = Await.result(futureResult, Duration.create(1, SECONDS)); + Iterable result = Await.result(futureResult, Duration.create(5, SECONDS)); assertEquals(Arrays.asList("A", "B", "C"), result); //#traverse } @@ -286,7 +286,7 @@ public class FutureDocTestBase { return r + t; //Just concatenate } }, ec); - String result = Await.result(resultFuture, Duration.create(1, SECONDS)); + String result = Await.result(resultFuture, Duration.create(5, SECONDS)); //#fold assertEquals("ab", result); @@ -310,7 +310,7 @@ public class FutureDocTestBase { } }, ec); - Object result = Await.result(resultFuture, Duration.create(1, SECONDS)); + Object result = Await.result(resultFuture, Duration.create(5, SECONDS)); //#reduce assertEquals("ab", result); @@ -326,10 +326,10 @@ public class FutureDocTestBase { Future otherFuture = Futures.failed( new IllegalArgumentException("Bang!")); //#failed - Object result = Await.result(future, Duration.create(1, SECONDS)); + Object result = Await.result(future, Duration.create(5, SECONDS)); assertEquals("Yay!", result); Throwable result2 = Await.result(otherFuture.failed(), - Duration.create(1, SECONDS)); + Duration.create(5, SECONDS)); assertEquals("Bang!", result2.getMessage()); } @@ -399,7 +399,7 @@ public class FutureDocTestBase { throw problem; } }, ec); - int result = Await.result(future, Duration.create(1, SECONDS)); + int result = Await.result(future, Duration.create(5, SECONDS)); assertEquals(result, 0); //#recover } @@ -425,7 +425,7 @@ public class FutureDocTestBase { throw problem; } }, ec); - int result = Await.result(future, Duration.create(1, SECONDS)); + int result = Await.result(future, Duration.create(5, SECONDS)); assertEquals(result, 0); //#try-recover } @@ -497,7 +497,7 @@ public class FutureDocTestBase { } }, ec); - String result = Await.result(future3, Duration.create(1, SECONDS)); + String result = Await.result(future3, Duration.create(5, SECONDS)); assertEquals("foo bar", result); //#zip } @@ -509,7 +509,7 @@ public class FutureDocTestBase { Future future3 = Futures.successful("bar"); // Will have "bar" in this case Future future4 = future1.fallbackTo(future2).fallbackTo(future3); - String result = Await.result(future4, Duration.create(1, SECONDS)); + String result = Await.result(future4, Duration.create(5, SECONDS)); assertEquals("bar", result); //#fallback-to } From 7fa58f735ab92ae336364294b2496bc790211b3b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 15 Oct 2012 12:02:08 +0200 Subject: [PATCH 20/23] Adding link to CONTRIBUTING.md in developer-guidelines.rst --- akka-docs/rst/dev/developer-guidelines.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-docs/rst/dev/developer-guidelines.rst b/akka-docs/rst/dev/developer-guidelines.rst index 903f2d64d9..17665c34e2 100644 --- a/akka-docs/rst/dev/developer-guidelines.rst +++ b/akka-docs/rst/dev/developer-guidelines.rst @@ -3,6 +3,8 @@ Developer Guidelines ==================== +First read: `The Akka Contributor Guidelines `_ . + Code Style ---------- From dbad8c85f6f94b47b12b12fba3a1e587ce59f9be Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 15 Oct 2012 13:49:12 +0200 Subject: [PATCH 21/23] Clarifying setReceiveTimeout examples after review --- .../java/code/docs/actor/MyReceivedTimeoutUntypedActor.java | 3 ++- akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java b/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java index 66c0bd4b20..d2b2322acc 100644 --- a/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java +++ b/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java @@ -18,10 +18,11 @@ public class MyReceivedTimeoutUntypedActor extends UntypedActor { public void onReceive(Object message) { if (message.equals("Hello")) { // To set in a response to a message - getContext().setReceiveTimeout(Duration.parse("30 seconds")); + getContext().setReceiveTimeout(Duration.parse("10 seconds")); getSender().tell("Hello world", getSelf()); } else if (message == ReceiveTimeout.getInstance()) { // To turn it off + getContext().setReceiveTimeout(Duration.Undefined()); throw new RuntimeException("received timeout"); } else { unhandled(message); diff --git a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala index 291c02bd85..ebd591db88 100644 --- a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala @@ -274,9 +274,9 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { // To set an initial delay context.setReceiveTimeout(30 milliseconds) def receive = { - case "Hello" ⇒ + case "Hello" ⇒ // To set in a response to a message - context.setReceiveTimeout(30 milliseconds) + context.setReceiveTimeout(100 milliseconds) case ReceiveTimeout ⇒ // To turn it off context.setReceiveTimeout(Duration.Undefined) From ec4871e801be2841f726ac14b0b377721484fd54 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 15 Oct 2012 14:04:20 +0200 Subject: [PATCH 22/23] Removing rst_html and rst_latex --- .../main/scala/akka/actor/TypedActor.scala | 26 +++---- akka-docs/rst/general/supervision.rst | 5 ++ project/AkkaBuild.scala | 76 ++++++++++--------- 3 files changed, 54 insertions(+), 53 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 69a3707f48..90cdc81744 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -607,8 +607,7 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory protected def actorFactory: ActorRefFactory = system protected def typedActor = this - val serialization = SerializationExtension(system) - val settings = system.settings + import system.settings /** * Default timeout for typed actor methods with non-void return type @@ -635,23 +634,18 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ⇒ ActorRef): R = { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) - val classLoader: ClassLoader = if (props.loader.nonEmpty) props.loader.get else props.interfaces.headOption.map(_.getClassLoader).orNull //If we have no loader, we arbitrarily take the loader of the first interface val proxy = Proxy.newProxyInstance( - classLoader, + (props.loader orElse props.interfaces.collectFirst { case any ⇒ any.getClassLoader }).orNull, //If we have no loader, we arbitrarily take the loader of the first interface props.interfaces.toArray, - new TypedActorInvocationHandler( - this, - actorVar, - if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R] + new TypedActorInvocationHandler(this, actorVar, props.timeout getOrElse DefaultReturnTimeout)).asInstanceOf[R] - proxyVar match { - case null ⇒ - actorVar.set(actorRef) - proxy - case _ ⇒ - proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - actorVar.set(actorRef) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet - proxyVar.get + if (proxyVar eq null) { + actorVar set actorRef + proxy + } else { + proxyVar set proxy // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + actorVar set actorRef //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet + proxyVar.get } } diff --git a/akka-docs/rst/general/supervision.rst b/akka-docs/rst/general/supervision.rst index c28bbfc4f2..2b747316b2 100644 --- a/akka-docs/rst/general/supervision.rst +++ b/akka-docs/rst/general/supervision.rst @@ -189,6 +189,11 @@ external resource, which may also be one of its own children. If a third party terminates a child by way of the ``system.stop(child)`` method or sending a :class:`PoisonPill`, the supervisor might well be affected. +..warning:: + + DeathWatch for Akka Remote does not (yet) get triggered by connection failures. + This feature may be added in a future release of Akka Remoting. + One-For-One Strategy vs. All-For-One Strategy --------------------------------------------- diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 7d0789aaae..75f61aff07 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -648,7 +648,45 @@ object AkkaBuild extends Build { // Dependencies object Dependencies { - import Dependency._ + + object Compile { + // Compile + val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 + + val config = "com.typesafe" % "config" % "0.6.0" // ApacheV2 + val netty = "io.netty" % "netty" % "3.5.8.Final" // ApacheV2 + val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD + val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) + + val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.2" // MIT + val zeroMQClient = "org.zeromq" % "zeromq-scala-binding" % "0.0.6" cross CrossVersion.full // ApacheV2 + val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 + val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2 + val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 + + + // Camel Sample + val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2 + + // Test + + object Test { + val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2 + val commonsIo = "commons-io" % "commons-io" % "2.0.1" % "test" // ApacheV2 + val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0 + val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" % "test" // EPL 1.0 / LGPL 2.1 + val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT + val scalatest = "org.scalatest" % "scalatest" % "1.9-2.10.0-M7-B1" % "test" cross CrossVersion.full // ApacheV2 + val scalacheck = "org.scalacheck" % "scalacheck" % "1.10.0" % "test" cross CrossVersion.full // New BSD + val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2 + val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2 + val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2 + val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 + val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT + } + } + + import Compile._ val actor = Seq(config) @@ -692,39 +730,3 @@ object Dependencies { val multiNodeSample = Seq(Test.scalatest) } - -object Dependency { - // Compile - val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 - val config = "com.typesafe" % "config" % "0.6.0" // ApacheV2 - val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2 - val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD - val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) - - val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.2" // MIT - val zeroMQClient = "org.zeromq" % "zeromq-scala-binding" % "0.0.6" cross CrossVersion.full // ApacheV2 - val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 - val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2 - val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 - - - // Camel Sample - val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2 - - // Test - - object Test { - val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2 - val commonsIo = "commons-io" % "commons-io" % "2.0.1" % "test" // ApacheV2 - val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0 - val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" % "test" // EPL 1.0 / LGPL 2.1 - val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT - val scalatest = "org.scalatest" % "scalatest" % "1.9-2.10.0-M7-B1" % "test" cross CrossVersion.full // ApacheV2 - val scalacheck = "org.scalacheck" % "scalacheck" % "1.10.0" % "test" cross CrossVersion.full // New BSD - val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2 - val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2 - val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2 - val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 - val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT - } -} From f488b32b3496c52300857740daaf3d6eb3191eb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Mon, 15 Oct 2012 15:13:59 +0200 Subject: [PATCH 23/23] Corrected mixup in migration guide. --- akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst index 449746ad02..88ae42fb6d 100644 --- a/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst @@ -203,17 +203,17 @@ v2.0 Scala:: v2.1 Scala:: - val router2 = system.actorOf(Props[ExampleActor1].withRouter( - RoundRobinRouter(routees = routees))) + val router2 = system.actorOf(Props.empty.withRouter( + RoundRobinRouter(routees = routees))) v2.0 Java:: - ActorRef router2 = system.actorOf(new Props(ExampleActor.class).withRouter( + ActorRef router2 = system.actorOf(new Props().withRouter( RoundRobinRouter.create(routees))); v2.1 Java:: - ActorRef router2 = system.actorOf(new Props().withRouter( + ActorRef router2 = system.actorOf(Props.empty().withRouter( RoundRobinRouter.create(routees))); Props: Function-based creation