diff --git a/.gitignore b/.gitignore index f646a4c173..48632735ce 100755 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ project/plugins/project project/boot/* */project/build/target */project/boot +*/project/project.target.config-classes lib_managed etags tags @@ -67,3 +68,4 @@ redis/ beanstalk/ .scalastyle bin/ +.worksheet diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fc929aca31..7b2b0af92d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,57 +1,91 @@ #Contributing to Akka# -Greetings traveller! - ##Infrastructure## * [Akka Contributor License Agreement](http://www.typesafe.com/contribute/cla) * [Akka Issue Tracker](http://doc.akka.io/docs/akka/current/project/issue-tracking.html) * [Scalariform](https://github.com/mdr/scalariform) -##Workflow## +# Typesafe Project & Developer Guidelines -0. Sign the Akka Contributor License Agreement, - we won't accept anything from anybody who has not signed it. -1. Find-or-create a ticket in the issue tracker -2. Assign that ticket to yourself -3. Create a local branch with the following name format: wip-X-Y-Z - where the X is the number of the ticket in the tracker, - and Y is some brief keywords of the ticket title and Z is your initials or similar. - Example: wip-2373-add-contributing-md-√ -4. Do what needs to be done (with tests and docs if applicable). - Your branch should pass all tests before going any further. -5. Push the branch to your clone of the Akka repository -6. Create a Pull Request onto the applicable Akka branch, - if the number of commits are more than a few, please squash the - commits first. -7. Change the status of your ticket to "Test" -8. The Pull Request will be reviewed by the Akka committers -9. Modify the Pull Request as agreed upon during the review, - then push the changes to your branch in your Akka repository, - the Pull Request should be automatically updated with the new - content. -10. Several cycles of review-then-change might occur. -11. Pull Request is either merged by the Akka committers, - or rejected, and the associated ticket will be updated to - reflect that. -12. Delete the local and remote wip-X-Y-Z +These guidelines are meant to be a living document that should be changed and adapted as needed. We encourage changes that makes it easier to achieve our goals in an efficient way. -##Code Reviews## +These guidelines mainly applies to Typesafe’s “mature” projects - not necessarily to projects of the type ‘collection of scripts’ etc. -Akka utilizes peer code reviews to streamline the codebase, reduce the defect ratio, -increase maintainability and spread knowledge about how things are solved. +## General Workflow -Core review values: +This is the process for committing code into master. There are of course exceptions to these rules, for example minor changes to comments and documentation, fixing a broken build etc. -* Rule: [The Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule) - - Why: Small improvements add up over time, keeping the codebase in shape. -* Rule: [Don't Repeat Yourself](http://programmer.97things.oreilly.com/wiki/index.php/Don't_Repeat_Yourself) - - Why: Repetitions are not maintainable, keeping things DRY makes it easier to fix bugs and refactor, - since you only need to apply the correction in one place, or perform the refactoring at one place. -* Rule: Feature tests > Integration tests > Unit tests - - Why: Without proving that a feature works, the code is only liability. - Without proving that a feature works with other features, the code is of limited value. - Without proving the individual parts of a feature works, the code is harder to debug. +1. Make sure you have signed the [Typesafe CLA](http://www.typesafe.com/contribute/cla), if not, sign it online. +2. Before starting to work on a feature or a fix, you have to make sure that: + 1. There is a ticket for your work in the project's issue tracker. If not, create it first. + 2. The ticket has been scheduled for the current milestone. + 3. The ticket is estimated by the team. + 4. The ticket have been discussed and prioritized by the team. +3. You should always perform your work in a Git feature branch. The branch should be given a descriptive name that explains its intent. Some teams also like adding the ticket number and/or the [GitHub](http://github.com) user ID to the branch name, these details is up to each of the individual teams. +4. When the feature or fix is completed you should open a [Pull Request](https://help.github.com/articles/using-pull-requests) on GitHub. +5. The Pull Request should be reviewed by other maintainers (as many as feasible/practical). Note that the maintainers can consist of outside contributors, both within and outside Typesafe. Outside contributors (for example from EPFL or independent committers) are encouraged to participate in the review process, it is not a closed process. +6. After the review you should fix the issues as needed (pushing a new commit for new review etc.), iterating until the reviewers give their thumbs up. +7. Once the code has passed review the Pull Request can be merged into the master branch. + +## Pull Request Requirements + +For a Pull Request to be considered at all it has to meet these requirements: + +1. Live up to the current code standard: + - Not violate [DRY](http://programmer.97things.oreilly.com/wiki/index.php/Don%27t_Repeat_Yourself). + - [Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule) needs to have been applied. +2. Regardless if the code introduces new features or fixes bugs or regressions, it must have comprehensive tests. +3. The code must be well documented in the Typesafe's standard documentation format (see the ‘Documentation’ section below). + +If these requirements are not met then the code should **not** be merged into master, or even reviewed - regardless of how good or important it is. No exceptions. + +## Continuous Integration + +Each project should be configured to use a continuous integration (CI) tool (i.e. a build server ala Jenkins). Typesafe has a Jenkins server farm that can be used. The CI tool should, on each push to master, build the **full** distribution and run **all** tests, and if something fails it should email out a notification with the failure report to the committer and the core team. The CI tool should also be used in conjunction with Typesafe’s Pull Request Validator (discussed below). + +## Documentation + +All documentation should be generated using the sbt-site-plugin, *or* publish artifacts to a repository that can be consumed by the typesafe stack. + +All documentation must abide by the following maxims: + +- Example code should be run as part of an automated test suite. +- Version should be **programmatically** specifiable to the build. +- Generation should be **completely automated** and available for scripting. +- Artifacts that must be included in the Typesafe Stack should be published to a maven “documentation” repository as documentation artifacts. + +All documentation is preferred to be in Typesafe's standard documentation format [reStructuredText](http://doc.akka.io/docs/akka/snapshot/dev/documentation.html) compiled using Typesafe's customized [Sphinx](http://sphinx.pocoo.org/) based documentation generation system, which among other things allows all code in the documentation to be externalized into compiled files and imported into the documentation. + +For more info, or for a starting point for new projects, look at the [Typesafe Documentation Sample project](https://github.com/typesafehub/doc-example). + +For larger projects that have invested a lot of time and resources into their current documentation and samples scheme (like for example Play), it is understandable that it will take some time to migrate to this new model. In these cases someone from the project needs to take the responsibility of manual QA and verifier for the documentation and samples. + +## Work In Progress + +It is ok to work on a public feature branch in the GitHub repository. Something that can sometimes be useful for early feedback etc. If so then it is preferable to name the branch accordingly. This can be done by either prefix the name with ``wip-`` as in ‘Work In Progress’, or use hierarchical names like ``wip/..``, ``feature/..`` or ``topic/..``. Either way is fine as long as it is clear that it is work in progress and not ready for merge. This work can temporarily have a lower standard. However, to be merged into master it will have to go through the regular process outlined above, with Pull Request, review etc.. + +Also, to facilitate both well-formed commits and working together, the ``wip`` and ``feature``/``topic`` identifiers also have special meaning. Any branch labelled with ``wip`` is considered “git-unstable” and may be rebased and have its history rewritten. Any branch with ``feature``/``topic`` in the name is considered “stable” enough for others to depend on when a group is working on a feature. + +## Creating Commits And Writing Commit Messages + +Follow these guidelines when creating public commits and writing commit messages. + +1. If your work spans multiple local commits (for example; if you do safe point commits while working in a feature branch or work in a branch for long time doing merges/rebases etc.) then please do not commit it all but rewrite the history by squashing the commits into a single big commit which you write a good commit message for (like discussed in the following sections). For more info read this article: [Git Workflow](http://sandofsky.com/blog/git-workflow.html). Every commit should be able to be used in isolation, cherry picked etc. +2. First line should be a descriptive sentence what the commit is doing. It should be possible to fully understand what the commit does by just reading this single line. It is **not ok** to only list the ticket number, type "minor fix" or similar. Include reference to ticket number, prefixed with #, at the end of the first line. If the commit is a small fix, then you are done. If not, go to 3. +3. Following the single line description should be a blank line followed by an enumerated list with the details of the commit. +4. Add keywords for your commit (depending on the degree of automation we reach, the list may change over time): + * ``Review by @gituser`` - if you want to notify someone on the team. The others can, and are encouraged to participate. + * ``Fix/Fixing/Fixes/Close/Closing/Refs #ticket`` - if you want to mark the ticket as fixed in the issue tracker (Assembla understands this). + * ``backport to _branch name_`` - if the fix needs to be cherry-picked to another branch (like 2.9.x, 2.10.x, etc) + +Example: + + Added monadic API to Future. Fixes #2731 + + * Details 1 + * Details 2 + * Details 3 ##Source style## diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index b4cbee5361..6dbe48ba40 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -607,8 +607,7 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory protected def actorFactory: ActorRefFactory = system protected def typedActor = this - val serialization = SerializationExtension(system) - val settings = system.settings + import system.settings /** * Default timeout for typed actor methods with non-void return type @@ -635,23 +634,18 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ⇒ ActorRef): R = { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) - val classLoader: ClassLoader = if (props.loader.nonEmpty) props.loader.get else props.interfaces.headOption.map(_.getClassLoader).orNull //If we have no loader, we arbitrarily take the loader of the first interface val proxy = Proxy.newProxyInstance( - classLoader, + (props.loader orElse props.interfaces.collectFirst { case any ⇒ any.getClassLoader }).orNull, //If we have no loader, we arbitrarily take the loader of the first interface props.interfaces.toArray, - new TypedActorInvocationHandler( - this, - actorVar, - if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R] + new TypedActorInvocationHandler(this, actorVar, props.timeout getOrElse DefaultReturnTimeout)).asInstanceOf[R] - proxyVar match { - case null ⇒ - actorVar.set(actorRef) - proxy - case _ ⇒ - proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - actorVar.set(actorRef) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet - proxyVar.get + if (proxyVar eq null) { + actorVar set actorRef + proxy + } else { + proxyVar set proxy // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + actorVar set actorRef //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet + proxyVar.get } } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 6aa618f6e7..2ce607d483 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -34,8 +34,8 @@ object CircuitBreaker { * * @param scheduler Reference to Akka scheduler * @param maxFailures Maximum number of failures before opening the circuit - * @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure - * @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit + * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure + * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit */ def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(syncExecutionContext) @@ -48,8 +48,8 @@ object CircuitBreaker { * * @param scheduler Reference to Akka scheduler * @param maxFailures Maximum number of failures before opening the circuit - * @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure - * @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit + * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure + * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit */ def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = apply(scheduler, maxFailures, callTimeout, resetTimeout) @@ -71,8 +71,8 @@ object CircuitBreaker { * * @param scheduler Reference to Akka scheduler * @param maxFailures Maximum number of failures before opening the circuit - * @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure - * @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit + * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure + * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners */ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { @@ -457,7 +457,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite /** * Calculate remaining timeout to inform the caller in case a backoff algorithm is useful * - * @return [[akka.util.Deadline]] to when the breaker will attempt a reset by transitioning to half-open + * @return [[scala.concurrent.duration.Deadline]] to when the breaker will attempt a reset by transitioning to half-open */ private def remainingTimeout(): Deadline = get match { case 0L ⇒ Deadline.now diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 79c31cda33..d3bef92e6c 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -18,7 +18,7 @@ import java.util.Arrays * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) { +class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) { import ConsistentHash._ diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 47e9d54071..7400de9810 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -181,7 +181,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex } /** - * For internal use only. Converts Strings to [[scala.concurrent.util.Duration]] + * For internal use only. Converts Strings to [[scala.concurrent.duration.Duration]] */ private[camel] object DurationTypeConverter extends TypeConverterSupport { diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 4347f6c0b0..a1215f4563 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -78,6 +78,10 @@ akka { # how often should the node send out heartbeats? heartbeat-interval = 1s + # Number of member nodes that each member will send heartbeat messages to, + # i.e. each node will be monitored by this number of other nodes. + monitored-by-nr-of-members = 5 + # defines the failure detector threshold # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely, a high diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 69d3f6db03..13f93d0482 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -60,7 +60,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - val selfAddress = system.provider match { + val selfAddress: Address = system.provider match { case c: ClusterActorRefProvider ⇒ c.transport.address case other ⇒ throw new ConfigurationException( "ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]". @@ -72,7 +72,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { log.info("Cluster Node [{}] - is starting up...", selfAddress) - val failureDetector = { + val failureDetector: FailureDetector = { import settings.{ FailureDetectorImplementationClass ⇒ fqcn } system.dynamicAccess.createInstanceFor[FailureDetector]( fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index d23c06f443..9a69922521 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -153,8 +153,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac withDispatcher(context.props.dispatcher), name = "publisher") val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)). withDispatcher(context.props.dispatcher), name = "core") - context.actorOf(Props[ClusterHeartbeatDaemon]. - withDispatcher(context.props.dispatcher), name = "heartbeat") + context.actorOf(Props[ClusterHeartbeatReceiver]. + withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)). withDispatcher(context.props.dispatcher), name = "metrics") @@ -170,26 +170,24 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ - import ClusterHeartbeatSender._ + import ClusterHeartbeatSender.JoinInProgress val cluster = Cluster(context.system) import cluster.{ selfAddress, scheduler, failureDetector } import cluster.settings._ val vclockNode = VectorClock.Node(selfAddress.toString) - val selfHeartbeat = Heartbeat(selfAddress) // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet var latestGossip: Gossip = Gossip() - var joinInProgress: Map[Address, Deadline] = Map.empty var stats = ClusterStats() - val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. - withDispatcher(UseDispatcher), name = "heartbeatSender") val coreSender = context.actorOf(Props[ClusterCoreSender]. withDispatcher(UseDispatcher), name = "coreSender") + val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. + withDispatcher(UseDispatcher), name = "heartbeatSender") import context.dispatcher @@ -197,10 +195,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval, self, GossipTick) - // start periodic heartbeat to all nodes in cluster - val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval), - HeartbeatInterval, self, HeartbeatTick) - // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) val failureDetectorReaperTask = scheduler.schedule(PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval, self, ReapUnreachableTick) @@ -221,7 +215,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto override def postStop(): Unit = { gossipTask.cancel() - heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() publishStatsTask foreach { _.cancel() } @@ -239,7 +232,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() - case HeartbeatTick ⇒ heartbeat() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() case PublishStatsTick ⇒ publishInternalStats() @@ -282,12 +274,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localGossip = latestGossip // wipe our state since a node that joins a cluster must be empty latestGossip = Gossip() - joinInProgress = Map(address -> (Deadline.now + JoinTimeout)) // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() publish(localGossip) + heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) context.become(initialized) if (address == selfAddress) @@ -506,12 +498,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer else remoteGossip // remote gossip is newer - val newJoinInProgress = - if (joinInProgress.isEmpty) joinInProgress - else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address) - latestGossip = winningGossip seen selfAddress - joinInProgress = newJoinInProgress // for all new joining nodes we remove them from the failure detector (latestGossip.members -- localGossip.members).foreach { @@ -733,27 +720,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } } - def heartbeat(): Unit = { - removeOverdueJoinInProgress() - - val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys - - val deadline = Deadline.now + HeartbeatInterval - beatTo.foreach { address ⇒ if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) } - } - - /** - * Removes overdue joinInProgress from State. - */ - def removeOverdueJoinInProgress(): Unit = { - joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } - } - /** * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ def reapUnreachableMembers(): Unit = { - if (!isSingletonCluster && isAvailable) { // only scrutinize if we are a non-singleton cluster and available diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index b4dffff0fa..11a6769f71 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -5,29 +5,47 @@ package akka.cluster import language.postfixOps -import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } -import java.security.MessageDigest -import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import scala.collection.immutable.SortedSet +import scala.annotation.tailrec import scala.concurrent.duration._ +import java.net.URLEncoder +import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props } +import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import akka.cluster.ClusterEvent._ +import akka.routing.ConsistentHash /** - * Sent at regular intervals for failure detection. + * INTERNAL API */ -case class Heartbeat(from: Address) extends ClusterMessage +private[akka] object ClusterHeartbeatReceiver { + /** + * Sent at regular intervals for failure detection. + */ + case class Heartbeat(from: Address) extends ClusterMessage + + /** + * Tell failure detector at receiving side that it should + * remove the monitoring, because heartbeats will end from + * this node. + */ + case class EndHeartbeat(from: Address) extends ClusterMessage +} /** * INTERNAL API. * - * Receives Heartbeat messages and delegates to Cluster. + * Receives Heartbeat messages and updates failure detector. * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized * to Cluster message after message, but concurrent with other types of messages. */ -private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { + import ClusterHeartbeatReceiver._ val failureDetector = Cluster(context.system).failureDetector def receive = { - case Heartbeat(from) ⇒ failureDetector heartbeat from + case Heartbeat(from) ⇒ failureDetector heartbeat from + case EndHeartbeat(from) ⇒ failureDetector remove from } } @@ -37,69 +55,271 @@ private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogg */ private[cluster] object ClusterHeartbeatSender { /** - * - * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]] - * to the other node. + * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of + * another node and heartbeats should be sent unconditionally until it becomes + * member or deadline is overdue. This is done to be able to detect immediate death + * of the joining node. * Local only, no need to serialize. */ - case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + case class JoinInProgress(address: Address, deadline: Deadline) } /* * INTERNAL API * * This actor is responsible for sending the heartbeat messages to - * other nodes. Netty blocks when sending to broken connections. This actor - * isolates sending to different nodes by using child workers for each target + * a few other nodes that will monitor this node. + * + * Netty blocks when sending to broken connections. This actor + * isolates sending to different nodes by using child actors for each target * address and thereby reduce the risk of irregular heartbeats to healty * nodes due to broken connections to other nodes. */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ + import ClusterHeartbeatSenderConnection._ + import ClusterHeartbeatReceiver._ + import InternalClusterAction.HeartbeatTick + + val cluster = Cluster(context.system) + import cluster.{ selfAddress, scheduler } + import cluster.settings._ + import context.dispatcher + + val selfHeartbeat = Heartbeat(selfAddress) + val selfEndHeartbeat = EndHeartbeat(selfAddress) + + var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor), + selfAddress.toString, MonitoredByNrOfMembers) + + // start periodic heartbeat to other nodes in cluster + val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], + HeartbeatInterval, self, HeartbeatTick) + + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + + override def postStop(): Unit = { + heartbeatTask.cancel() + cluster.unsubscribe(self) + } /** * Looks up and returns the remote cluster heartbeat connection for the specific address. */ def clusterHeartbeatConnectionFor(address: Address): ActorRef = - context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") - - val digester = MessageDigest.getInstance("MD5") - - /** - * Child name is MD5 hash of the address. - * FIXME Change to URLEncode when ticket #2123 has been fixed - */ - def encodeChildName(name: String): String = { - digester update name.getBytes("UTF-8") - digester.digest.map { h ⇒ "%02x".format(0xFF & h) }.mkString - } + context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") def receive = { - case msg @ SendHeartbeat(from, to, deadline) ⇒ - val workerName = encodeChildName(to.toString) - val worker = context.actorFor(workerName) match { + case HeartbeatTick ⇒ heartbeat() + case s: CurrentClusterState ⇒ reset(s) + case MemberUnreachable(m) ⇒ removeMember(m) + case MemberRemoved(m) ⇒ removeMember(m) + case e: MemberEvent ⇒ addMember(e.member) + case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) + } + + def reset(snapshot: CurrentClusterState): Unit = + state = state.reset(snapshot.members.collect { case m if m.address != selfAddress ⇒ m.address }) + + def addMember(m: Member): Unit = if (m.address != selfAddress) + state = state addMember m.address + + def removeMember(m: Member): Unit = if (m.address != selfAddress) + state = state removeMember m.address + + def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress) + state = state.addJoinInProgress(address, deadline) + + def heartbeat(): Unit = { + state = state.removeOverdueJoinInProgress() + + def connection(to: Address): ActorRef = { + // URL encoded target address as child actor name + val connectionName = URLEncoder.encode(to.toString, "UTF-8") + context.actorFor(connectionName) match { case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) + context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName) case child ⇒ child } - worker ! msg + } + + val deadline = Deadline.now + HeartbeatInterval + state.active foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } + + // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is + // sent to notify it that no more heartbeats will be sent. + for ((to, count) ← state.ending) { + val c = connection(to) + c ! SendEndHeartbeat(selfEndHeartbeat, to) + if (count == NumberOfEndHeartbeats) { + state = state.removeEnding(to) + c ! PoisonPill + } else + state = state.increaseEndingCount(to) + } } } /** - * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. + * INTERNAL API + */ +private[cluster] object ClusterHeartbeatSenderState { + /** + * Initial, empty state + */ + def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String, + monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers) + + /** + * Create a new state based on previous state, and + * keep track of which nodes to stop sending heartbeats to. + */ + private def apply( + old: ClusterHeartbeatSenderState, + consistentHash: ConsistentHash[Address], + all: Set[Address]): ClusterHeartbeatSenderState = { + + /** + * Select a few peers that heartbeats will be sent to, i.e. that will + * monitor this node. Try to send heartbeats to same nodes as much + * as possible, but re-balance with consistent hashing algorithm when + * new members are added or removed. + */ + def selectPeers: Set[Address] = { + val allSize = all.size + val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers) + // try more if consistentHash results in same node as already selected + val attemptLimit = nrOfPeers * 2 + @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { + if (acc.size == nrOfPeers || n == attemptLimit) acc + else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1) + } + if (nrOfPeers >= allSize) all + else select(Set.empty[Address], 0) + } + + val curr = selectPeers + // start ending process for nodes not selected any more + // abort ending process for nodes that have been selected again + val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr + old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end) + } + +} + +/** + * INTERNAL API * - * Netty blocks when sending to broken connections, and this actor uses - * a configurable circuit breaker to reduce connect attempts to broken + * State used by [akka.cluster.ClusterHeartbeatSender]. + * The initial state is created with `empty` in the of + * the companion object, thereafter the state is modified + * with the methods, such as `addMember`. It is immutable, + * i.e. the methods return new instances. + */ +private[cluster] case class ClusterHeartbeatSenderState private ( + consistentHash: ConsistentHash[Address], + selfAddressStr: String, + monitoredByNrOfMembers: Int, + all: Set[Address] = Set.empty, + current: Set[Address] = Set.empty, + ending: Map[Address, Int] = Map.empty, + joinInProgress: Map[Address, Deadline] = Map.empty) { + + // FIXME can be disabled as optimization + assertInvariants + + private def assertInvariants: Unit = { + val currentAndEnding = current.intersect(ending.keySet) + require(currentAndEnding.isEmpty, + "Same nodes in current and ending not allowed, got [%s]" format currentAndEnding) + val joinInProgressAndAll = joinInProgress.keySet.intersect(all) + require(joinInProgressAndAll.isEmpty, + "Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll) + val currentNotInAll = current -- all + require(currentNotInAll.isEmpty, + "Nodes in current but not in all not allowed, got [%s]" format currentNotInAll) + require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]" + format all) + } + + val active: Set[Address] = current ++ joinInProgress.keySet + + def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ }, + consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), + all = nodes) + + def addMember(a: Address): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a) + + def removeMember(a: Address): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a) + + private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { + if (joinInProgress contains address) + copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0)) + else this + } + + def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { + if (all contains address) this + else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address) + } + + /** + * Cleanup overdue joinInProgress, in case a joining node never + * became member, for some reason. + */ + def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = { + val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } + if (overdue.isEmpty) this + else + copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue) + } + + def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a) + + def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1))) + +} + +/** + * INTERNAL API + */ +private[cluster] object ClusterHeartbeatSenderConnection { + import ClusterHeartbeatReceiver._ + + /** + * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send + * [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node. + * Local only, no need to serialize. + */ + case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + + /** + * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send + * [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node. + * Local only, no need to serialize. + */ + case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address) +} + +/** + * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] + * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address. + * + * This actor exists only because Netty blocks when sending to broken connections, + * and this actor uses a configurable circuit breaker to reduce connect attempts to broken * connections. * - * @see ClusterHeartbeatSender + * @see akka.cluster.ClusterHeartbeatSender */ -private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) +private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) extends Actor with ActorLogging { - import ClusterHeartbeatSender._ + import ClusterHeartbeatSenderConnection._ val breaker = { val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings @@ -110,21 +330,19 @@ private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) } - // make sure it will cleanup when not used any more - context.setReceiveTimeout(30 seconds) - def receive = { case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ if (!deadline.isOverdue) { - // the CircuitBreaker will measure elapsed time and open if too many long calls + log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) + // Netty blocks when sending to broken connections, the CircuitBreaker will + // measure elapsed time and open if too many long calls try breaker.withSyncCircuitBreaker { - log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) toRef ! heartbeatMsg - if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } } - - case ReceiveTimeout ⇒ context.stop(self) // cleanup when not used - + if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) + case SendEndHeartbeat(endHeartbeatMsg, _) ⇒ + log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef) + toRef ! endHeartbeatMsg } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index babafd5b21..b8fa31fbc3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -16,14 +16,34 @@ import scala.concurrent.duration.FiniteDuration class ClusterSettings(val config: Config, val systemName: String) { import config._ - final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold") - final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class") - final val FailureDetectorMinStdDeviation: FiniteDuration = - Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) - final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = - Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) - final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + final val FailureDetectorThreshold: Double = { + val x = getDouble("akka.cluster.failure-detector.threshold") + require(x > 0.0, "failure-detector.threshold must be > 0") + x + } + final val FailureDetectorMaxSampleSize: Int = { + val n = getInt("akka.cluster.failure-detector.max-sample-size") + require(n > 0, "failure-detector.max-sample-size must be > 0"); n + } + final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class") + final val FailureDetectorMinStdDeviation: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.min-std-deviation must be > 0"); d + } + final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) + require(d >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0"); d + } + final val HeartbeatInterval: FiniteDuration = { + val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d + } + final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration + final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt + final val MonitoredByNrOfMembers: Int = { + val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") + require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n + } final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 52a9a55e21..bdbb1297fb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -196,7 +196,7 @@ private[akka] class ClusterRouteeProvider( private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees - val currentNodes = availbleNodes + val currentNodes = availableNodes if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { None } else { @@ -222,7 +222,7 @@ private[akka] class ClusterRouteeProvider( case a ⇒ a } - private[routing] def availbleNodes: SortedSet[Address] = { + private[routing] def availableNodes: SortedSet[Address] = { import Member.addressOrdering val currentNodes = nodes if (currentNodes.isEmpty && settings.allowLocalRoutees) @@ -236,11 +236,11 @@ private[akka] class ClusterRouteeProvider( private[routing] var nodes: SortedSet[Address] = { import Member.addressOrdering cluster.readView.members.collect { - case m if isAvailble(m) ⇒ m.address + case m if isAvailable(m) ⇒ m.address } } - private[routing] def isAvailble(m: Member): Boolean = { + private[routing] def isAvailable(m: Member): Boolean = { m.status == MemberStatus.Up && (settings.allowLocalRoutees || m.address != cluster.selfAddress) } @@ -271,10 +271,10 @@ private[akka] class ClusterRouterActor extends Router { override def routerReceive: Receive = { case s: CurrentClusterState ⇒ import Member.addressOrdering - routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailble(m) ⇒ m.address } + routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailable(m) ⇒ m.address } routeeProvider.createRoutees() - case m: MemberEvent if routeeProvider.isAvailble(m.member) ⇒ + case m: MemberEvent if routeeProvider.isAvailable(m.member) ⇒ routeeProvider.nodes += m.member.address // createRoutees will create routees based on // totalInstances and maxInstancesPerNode diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index ec9a9d4b5b..6465c5ead8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -39,7 +39,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { gossip-interval = 500 ms auto-join = off auto-down = on - failure-detector.acceptable-heartbeat-pause = 10s + failure-detector.acceptable-heartbeat-pause = 5s publish-stats-interval = 0 s # always, when it happens } akka.event-handlers = ["akka.testkit.TestEventListener"] @@ -54,7 +54,9 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.scheduler.tick-duration = 33 ms akka.remote.log-remote-lifecycle-events = off akka.remote.netty.execution-pool-size = 4 - #akka.remote.netty.reconnection-time-window = 1s + #akka.remote.netty.reconnection-time-window = 10s + akka.remote.netty.read-timeout = 5s + akka.remote.netty.write-timeout = 5s akka.remote.netty.backoff-timeout = 500ms akka.remote.netty.connection-timeout = 500ms diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index b16102e398..ddc9a46ecb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -4,10 +4,12 @@ package akka.cluster +import language.postfixOps import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ +import scala.concurrent.duration._ case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val controller = role("controller") @@ -69,7 +71,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig val leaderAddress = address(leader) enterBarrier("before-shutdown" + n) testConductor.shutdown(leader, 0) - enterBarrier("after-shutdown" + n, "after-down" + n, "completed" + n) + enterBarrier("after-shutdown" + n, "after-unavailable" + n, "after-down" + n, "completed" + n) case `leader` ⇒ enterBarrier("before-shutdown" + n, "after-shutdown" + n) @@ -78,15 +80,25 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig case `aUser` ⇒ val leaderAddress = address(leader) enterBarrier("before-shutdown" + n, "after-shutdown" + n) + + // detect failure + markNodeAsUnavailable(leaderAddress) + awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.address == leaderAddress)) + enterBarrier("after-unavailable" + n) + // user marks the shutdown leader as DOWN cluster.down(leaderAddress) enterBarrier("after-down" + n, "completed" + n) - markNodeAsUnavailable(leaderAddress) case _ if remainingRoles.contains(myself) ⇒ // remaining cluster nodes, not shutdown - enterBarrier("before-shutdown" + n, "after-shutdown" + n, "after-down" + n) + val leaderAddress = address(leader) + enterBarrier("before-shutdown" + n, "after-shutdown" + n) + awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.address == leaderAddress)) + enterBarrier("after-unavailable" + n) + + enterBarrier("after-down" + n) awaitUpConvergence(currentRoles.size - 1) val nextExpectedLeader = remainingRoles.head clusterView.isLeader must be(myself == nextExpectedLeader) @@ -97,12 +109,12 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig } } - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in within(20 seconds) { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) enterBarrier("after-2") } - "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in { + "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in within(20 seconds) { shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) enterBarrier("after-3") } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 360b82f04f..2d1a6542bd 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -28,6 +28,8 @@ class ClusterConfigSpec extends AkkaSpec { PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) + NumberOfEndHeartbeats must be(4) + MonitoredByNrOfMembers must be(5) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) PublishStatsInterval must be(10 second) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala new file mode 100644 index 0000000000..4eedee1df4 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address +import akka.routing.ConsistentHash +import scala.concurrent.duration._ + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { + + val selfAddress = Address("akka", "sys", "myself", 2552) + val aa = Address("akka", "sys", "aa", 2552) + val bb = Address("akka", "sys", "bb", 2552) + val cc = Address("akka", "sys", "cc", 2552) + val dd = Address("akka", "sys", "dd", 2552) + val ee = Address("akka", "sys", "ee", 2552) + + val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10), + selfAddress.toString, 3) + + "A ClusterHeartbeatSenderState" must { + + "return empty active set when no nodes" in { + emptyState.active.isEmpty must be(true) + } + + "include joinInProgress in active set" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds) + s.joinInProgress.keySet must be(Set(aa)) + s.active must be(Set(aa)) + } + + "remove joinInProgress from active set after removeOverdueJoinInProgress" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress() + s.joinInProgress must be(Map.empty) + s.active must be(Set.empty) + s.ending must be(Map(aa -> 0)) + } + + "remove joinInProgress after reset" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)) + s.joinInProgress must be(Map.empty) + } + + "remove joinInProgress after addMember" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa) + s.joinInProgress must be(Map.empty) + } + + "remove joinInProgress after removeMember" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa) + s.joinInProgress must be(Map.empty) + s.ending must be(Map(aa -> 0)) + } + + "remove from ending after addJoinInProgress" in { + val s = emptyState.reset(Set(aa, bb)).removeMember(aa) + s.ending must be(Map(aa -> 0)) + val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds) + s2.joinInProgress.keySet must be(Set(aa)) + s2.ending must be(Map.empty) + } + + "include nodes from reset in active set" in { + val nodes = Set(aa, bb, cc) + val s = emptyState.reset(nodes) + s.all must be(nodes) + s.current must be(nodes) + s.ending must be(Map.empty) + s.active must be(nodes) + } + + "limit current nodes to monitoredByNrOfMembers when adding members" in { + val nodes = Set(aa, bb, cc, dd) + val s = nodes.foldLeft(emptyState) { _ addMember _ } + s.all must be(nodes) + s.current.size must be(3) + s.addMember(ee).current.size must be(3) + } + + "move meber to ending set when removing member" in { + val nodes = Set(aa, bb, cc, dd, ee) + val s = emptyState.reset(nodes) + s.ending must be(Map.empty) + val included = s.current.head + val s2 = s.removeMember(included) + s2.ending must be(Map(included -> 0)) + s2.current must not contain (included) + val s3 = s2.addMember(included) + s3.current must contain(included) + s3.ending.keySet must not contain (included) + } + + "increase ending count correctly" in { + val s = emptyState.reset(Set(aa)).removeMember(aa) + s.ending must be(Map(aa -> 0)) + val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa) + s2.ending must be(Map(aa -> 2)) + } + + } +} diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java index dbaa9b4100..412e742849 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -8,7 +8,7 @@ package docs.circuitbreaker; import akka.actor.UntypedActor; import scala.concurrent.Future; import akka.event.LoggingAdapter; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.pattern.CircuitBreaker; import akka.event.Logging; diff --git a/akka-docs/rst/common/code/docs/duration/Java.java b/akka-docs/rst/common/code/docs/duration/Java.java index 06bea4d3e3..cd46c2c822 100644 --- a/akka-docs/rst/common/code/docs/duration/Java.java +++ b/akka-docs/rst/common/code/docs/duration/Java.java @@ -5,15 +5,15 @@ package docs.duration; //#import -import scala.concurrent.util.Duration; -import scala.concurrent.util.Deadline; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.Deadline; //#import class Java { public void demo() { //#dsl final Duration fivesec = Duration.create(5, "seconds"); - final Duration threemillis = Duration.parse("3 millis"); + final Duration threemillis = Duration.create("3 millis"); final Duration diff = fivesec.minus(threemillis); assert diff.lt(fivesec); assert Duration.Zero().lt(Duration.Inf()); diff --git a/akka-docs/rst/dev/developer-guidelines.rst b/akka-docs/rst/dev/developer-guidelines.rst index 903f2d64d9..17665c34e2 100644 --- a/akka-docs/rst/dev/developer-guidelines.rst +++ b/akka-docs/rst/dev/developer-guidelines.rst @@ -3,6 +3,8 @@ Developer Guidelines ==================== +First read: `The Akka Contributor Guidelines `_ . + Code Style ---------- diff --git a/akka-docs/rst/general/supervision.rst b/akka-docs/rst/general/supervision.rst index c28bbfc4f2..2b747316b2 100644 --- a/akka-docs/rst/general/supervision.rst +++ b/akka-docs/rst/general/supervision.rst @@ -189,6 +189,11 @@ external resource, which may also be one of its own children. If a third party terminates a child by way of the ``system.stop(child)`` method or sending a :class:`PoisonPill`, the supervisor might well be affected. +..warning:: + + DeathWatch for Akka Remote does not (yet) get triggered by connection failures. + This feature may be added in a future release of Akka Remoting. + One-For-One Strategy vs. All-For-One Strategy --------------------------------------------- diff --git a/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java b/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java index 7db5715e31..fdd1937014 100644 --- a/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/FaultHandlingTestBase.java @@ -14,7 +14,7 @@ import akka.actor.Terminated; import akka.actor.UntypedActor; import scala.concurrent.Await; import static akka.pattern.Patterns.ask; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.testkit.AkkaSpec; import akka.testkit.TestProbe; @@ -41,7 +41,7 @@ public class FaultHandlingTestBase { //#strategy private static SupervisorStrategy strategy = - new OneForOneStrategy(10, Duration.parse("1 minute"), + new OneForOneStrategy(10, Duration.create("1 minute"), new Function() { @Override public Directive apply(Throwable t) { @@ -81,7 +81,7 @@ public class FaultHandlingTestBase { //#strategy2 private static SupervisorStrategy strategy = new OneForOneStrategy(10, - Duration.parse("1 minute"), + Duration.create("1 minute"), new Function() { @Override public Directive apply(Throwable t) { diff --git a/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java b/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java index b1fb899be7..1c09272582 100644 --- a/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java +++ b/akka-docs/rst/java/code/docs/actor/MyReceivedTimeoutUntypedActor.java @@ -6,18 +6,23 @@ package docs.actor; //#receive-timeout import akka.actor.ReceiveTimeout; import akka.actor.UntypedActor; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; public class MyReceivedTimeoutUntypedActor extends UntypedActor { public MyReceivedTimeoutUntypedActor() { - getContext().setReceiveTimeout(Duration.parse("30 seconds")); + // To set an initial delay + getContext().setReceiveTimeout(Duration.create("30 seconds")); } public void onReceive(Object message) { if (message.equals("Hello")) { + // To set in a response to a message + getContext().setReceiveTimeout(Duration.create("10 seconds")); getSender().tell("Hello world", getSelf()); } else if (message == ReceiveTimeout.getInstance()) { + // To turn it off + getContext().setReceiveTimeout(Duration.Undefined()); throw new RuntimeException("received timeout"); } else { unhandled(message); diff --git a/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java b/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java index 34f56715d6..0b3d55f33f 100644 --- a/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java @@ -5,7 +5,7 @@ package docs.actor; //#imports1 import akka.actor.Props; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; //#imports1 diff --git a/akka-docs/rst/java/code/docs/actor/TypedActorDocTestBase.java b/akka-docs/rst/java/code/docs/actor/TypedActorDocTestBase.java index 3f0e2bdb09..35c8441263 100644 --- a/akka-docs/rst/java/code/docs/actor/TypedActorDocTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/TypedActorDocTestBase.java @@ -11,7 +11,7 @@ import akka.japi.*; import akka.dispatch.Futures; import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; //#imports diff --git a/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java b/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java index 95da8a7cd1..d825858239 100644 --- a/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java @@ -14,7 +14,7 @@ import scala.concurrent.Future; import akka.dispatch.Futures; import akka.dispatch.Mapper; import scala.concurrent.Await; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.util.Timeout; //#import-future @@ -35,7 +35,7 @@ import akka.actor.Terminated; import static akka.pattern.Patterns.gracefulStop; import scala.concurrent.Future; import scala.concurrent.Await; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.pattern.AskTimeoutException; //#import-gracefulStop @@ -44,7 +44,7 @@ import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe; import scala.concurrent.Future; import akka.dispatch.Futures; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.util.Timeout; import java.util.concurrent.TimeUnit; import java.util.ArrayList; @@ -192,7 +192,7 @@ public class UntypedActorDocTestBase { ActorSystem system = ActorSystem.create("MySystem"); ActorRef myActor = system.actorOf(new Props(WatchActor.class)); Future future = Patterns.ask(myActor, "kill", 1000); - assert Await.result(future, Duration.parse("1 second")).equals("finished"); + assert Await.result(future, Duration.create("1 second")).equals("finished"); system.shutdown(); } diff --git a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java index f724cbafbc..f3db04cfdf 100644 --- a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -13,7 +13,7 @@ import java.util.Map; import akka.actor.*; import akka.dispatch.Mapper; import akka.japi.Function; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.util.Timeout; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -62,7 +62,7 @@ public class FaultHandlingDocSample { public void preStart() { // If we don't get any progress within 15 seconds then the service // is unavailable - getContext().setReceiveTimeout(Duration.parse("15 seconds")); + getContext().setReceiveTimeout(Duration.create("15 seconds")); } public void onReceive(Object msg) { @@ -237,7 +237,7 @@ public class FaultHandlingDocSample { // Restart the storage child when StorageException is thrown. // After 3 restarts within 5 seconds it will be stopped. private static SupervisorStrategy strategy = new OneForOneStrategy(3, - Duration.parse("5 seconds"), new Function() { + Duration.create("5 seconds"), new Function() { @Override public Directive apply(Throwable t) { if (t instanceof StorageException) { diff --git a/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java b/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java index 4347cfb66a..10e369baeb 100644 --- a/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java +++ b/akka-docs/rst/java/code/docs/camel/ActivationTestBase.java @@ -8,8 +8,8 @@ package docs.camel; import akka.camel.javaapi.UntypedConsumerActor; import akka.util.Timeout; import scala.concurrent.Future; - import scala.concurrent.util.Duration; - import scala.concurrent.util.FiniteDuration; + import scala.concurrent.duration.Duration; + import scala.concurrent.duration.FiniteDuration; import static java.util.concurrent.TimeUnit.SECONDS; //#CamelActivation diff --git a/akka-docs/rst/java/code/docs/camel/Consumer4.java b/akka-docs/rst/java/code/docs/camel/Consumer4.java index 2074bc2c78..a41eba3869 100644 --- a/akka-docs/rst/java/code/docs/camel/Consumer4.java +++ b/akka-docs/rst/java/code/docs/camel/Consumer4.java @@ -2,8 +2,8 @@ package docs.camel; //#Consumer4 import akka.camel.CamelMessage; import akka.camel.javaapi.UntypedConsumerActor; -import scala.concurrent.util.Duration; -import scala.concurrent.util.FiniteDuration; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; diff --git a/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTestBase.java b/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTestBase.java index c4134413ac..72836b503d 100644 --- a/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTestBase.java +++ b/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTestBase.java @@ -9,7 +9,7 @@ import akka.actor.AbstractExtensionId; import akka.actor.ExtensionIdProvider; import akka.actor.ActorSystem; import akka.actor.ExtendedActorSystem; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import com.typesafe.config.Config; import java.util.concurrent.TimeUnit; diff --git a/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java b/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java index 7b1e1f2be5..5e9f70b8b0 100644 --- a/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java @@ -12,7 +12,7 @@ import akka.util.Timeout; //#imports1 //#imports2 -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.japi.Function; import java.util.concurrent.Callable; import static akka.dispatch.Futures.future; @@ -113,7 +113,7 @@ public class FutureDocTestBase { return "Hello" + "World"; } }, system.dispatcher()); - String result = (String) Await.result(f, Duration.create(1, SECONDS)); + String result = (String) Await.result(f, Duration.create(5, SECONDS)); //#future-eval assertEquals("HelloWorld", result); } @@ -135,7 +135,7 @@ public class FutureDocTestBase { } }, ec); - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); //#map } @@ -159,7 +159,7 @@ public class FutureDocTestBase { }, ec); //#map2 - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); } @@ -183,7 +183,7 @@ public class FutureDocTestBase { }, ec); //#map3 - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); } @@ -209,7 +209,7 @@ public class FutureDocTestBase { }, ec); //#flat-map - int result = Await.result(f2, Duration.create(1, SECONDS)); + int result = Await.result(f2, Duration.create(5, SECONDS)); assertEquals(10, result); } @@ -238,7 +238,7 @@ public class FutureDocTestBase { } }, ec); - long result = Await.result(futureSum, Duration.create(1, SECONDS)); + long result = Await.result(futureSum, Duration.create(5, SECONDS)); //#sequence assertEquals(3L, result); } @@ -262,7 +262,7 @@ public class FutureDocTestBase { }, ec); //Returns the sequence of strings as upper case - Iterable result = Await.result(futureResult, Duration.create(1, SECONDS)); + Iterable result = Await.result(futureResult, Duration.create(5, SECONDS)); assertEquals(Arrays.asList("A", "B", "C"), result); //#traverse } @@ -286,7 +286,7 @@ public class FutureDocTestBase { return r + t; //Just concatenate } }, ec); - String result = Await.result(resultFuture, Duration.create(1, SECONDS)); + String result = Await.result(resultFuture, Duration.create(5, SECONDS)); //#fold assertEquals("ab", result); @@ -310,7 +310,7 @@ public class FutureDocTestBase { } }, ec); - Object result = Await.result(resultFuture, Duration.create(1, SECONDS)); + Object result = Await.result(resultFuture, Duration.create(5, SECONDS)); //#reduce assertEquals("ab", result); @@ -326,10 +326,10 @@ public class FutureDocTestBase { Future otherFuture = Futures.failed( new IllegalArgumentException("Bang!")); //#failed - Object result = Await.result(future, Duration.create(1, SECONDS)); + Object result = Await.result(future, Duration.create(5, SECONDS)); assertEquals("Yay!", result); Throwable result2 = Await.result(otherFuture.failed(), - Duration.create(1, SECONDS)); + Duration.create(5, SECONDS)); assertEquals("Bang!", result2.getMessage()); } @@ -399,7 +399,7 @@ public class FutureDocTestBase { throw problem; } }, ec); - int result = Await.result(future, Duration.create(1, SECONDS)); + int result = Await.result(future, Duration.create(5, SECONDS)); assertEquals(result, 0); //#recover } @@ -425,7 +425,7 @@ public class FutureDocTestBase { throw problem; } }, ec); - int result = Await.result(future, Duration.create(1, SECONDS)); + int result = Await.result(future, Duration.create(5, SECONDS)); assertEquals(result, 0); //#try-recover } @@ -497,7 +497,7 @@ public class FutureDocTestBase { } }, ec); - String result = Await.result(future3, Duration.create(1, SECONDS)); + String result = Await.result(future3, Duration.create(5, SECONDS)); assertEquals("foo bar", result); //#zip } @@ -509,7 +509,7 @@ public class FutureDocTestBase { Future future3 = Futures.successful("bar"); // Will have "bar" in this case Future future4 = future1.fallbackTo(future2).fallbackTo(future3); - String result = Await.result(future4, Duration.create(1, SECONDS)); + String result = Await.result(future4, Duration.create(5, SECONDS)); assertEquals("bar", result); //#fallback-to } diff --git a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java index dc42707bfd..c4e7414ce1 100644 --- a/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/rst/java/code/docs/jrouting/CustomRouterDocTestBase.java @@ -19,7 +19,7 @@ import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.OneForOneStrategy; @@ -68,7 +68,7 @@ public class CustomRouterDocTestBase { public void demonstrateSupervisor() { //#supervision final SupervisorStrategy strategy = - new OneForOneStrategy(5, Duration.parse("1 minute"), + new OneForOneStrategy(5, Duration.create("1 minute"), new Class[] { Exception.class }); final ActorRef router = system.actorOf(new Props(MyActor.class) .withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy))); diff --git a/akka-docs/rst/java/code/docs/jrouting/ParentActor.java b/akka-docs/rst/java/code/docs/jrouting/ParentActor.java index c61e9d96f3..e3750bfd23 100644 --- a/akka-docs/rst/java/code/docs/jrouting/ParentActor.java +++ b/akka-docs/rst/java/code/docs/jrouting/ParentActor.java @@ -11,7 +11,7 @@ import akka.routing.SmallestMailboxRouter; import akka.actor.UntypedActor; import akka.actor.ActorRef; import akka.actor.Props; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.util.Timeout; import scala.concurrent.Future; import scala.concurrent.Await; diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java new file mode 100644 index 0000000000..e712eee146 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern; + +import akka.actor.*; +import akka.testkit.*; +import akka.testkit.TestEvent.Mute; +import akka.testkit.TestEvent.UnMute; +import org.junit.*; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class SchedulerPatternTest { + + static ActorSystem system; + + @BeforeClass + public static void setUp() { + system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf()); + } + + @AfterClass + public static void tearDown() { + system.shutdown(); + } + + static + //#schedule-constructor + public class ScheduleInConstructor extends UntypedActor { + + private final Cancellable tick = getContext().system().scheduler().schedule( + Duration.create(500, TimeUnit.MILLISECONDS), + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().dispatcher()); + //#schedule-constructor + // this variable and constructor is declared here to not show up in the docs + final ActorRef target; + public ScheduleInConstructor(ActorRef target) { + this.target = target; + } + //#schedule-constructor + + @Override + public void postStop() { + tick.cancel(); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // do something useful here + //#schedule-constructor + target.tell(message, getSelf()); + //#schedule-constructor + } + //#schedule-constructor + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-constructor + else { + unhandled(message); + } + } + } + //#schedule-constructor + + static + //#schedule-receive + public class ScheduleInReceive extends UntypedActor { + //#schedule-receive + // this variable and constructor is declared here to not show up in the docs + final ActorRef target; + public ScheduleInReceive(ActorRef target) { + this.target = target; + } + //#schedule-receive + + @Override + public void preStart() { + getContext().system().scheduler().scheduleOnce( + Duration.create(500, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().dispatcher()); + } + + // override postRestart so we don't call preStart and schedule a new message + @Override + public void postRestart(Throwable reason) { + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // send another periodic tick after the specified delay + getContext().system().scheduler().scheduleOnce( + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().dispatcher()); + // do something useful here + //#schedule-receive + target.tell(message, getSelf()); + //#schedule-receive + } + //#schedule-receive + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-receive + else { + unhandled(message); + } + } + } + //#schedule-receive + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInConstructor() { + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInConstructor(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2000 millis")); + }}; + } + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInReceive() { + + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInReceive(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2500 millis")); + }}; + } + + public static class TestSchedule extends JavaTestKit { + private ActorSystem system; + + public TestSchedule(ActorSystem system) { + super(system); + this.system = system; + } + + public void testSchedule(final JavaTestKit probe, Props props, + FiniteDuration startDuration, + FiniteDuration afterRestartDuration) { + Iterable filter = + Arrays.asList(new akka.testkit.EventFilter[]{ + (akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)}); + try { + system.eventStream().publish(new Mute(filter)); + + final ActorRef actor = system.actorOf(props); + new Within(startDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + actor.tell("restart", getRef()); + new Within(afterRestartDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + system.stop(actor); + } + finally { + system.eventStream().publish(new UnMute(filter)); + } + } + } +} diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java index 14a51f9957..89253110ff 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java @@ -26,7 +26,7 @@ import akka.testkit.TestActor; import akka.testkit.TestActor.AutoPilot; import akka.testkit.TestActorRef; import akka.testkit.JavaTestKit; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; public class TestKitDocTest { diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java index b86cc366da..fc8178b7f2 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java @@ -14,7 +14,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import akka.testkit.JavaTestKit; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; public class TestKitSampleTest { diff --git a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java b/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java index 5a761c3cfe..d9d09a9bac 100644 --- a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java +++ b/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java @@ -30,7 +30,7 @@ import akka.actor.UntypedActor; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; -import scala.concurrent.util.Duration; +import scala.concurrent.duration.Duration; import akka.serialization.SerializationExtension; import akka.serialization.Serialization; import java.io.Serializable; diff --git a/akka-docs/rst/java/howto.rst b/akka-docs/rst/java/howto.rst index 204d50e7dc..922d318c75 100644 --- a/akka-docs/rst/java/howto.rst +++ b/akka-docs/rst/java/howto.rst @@ -17,6 +17,37 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library You might find some of the patterns described in the Scala chapter of :ref:`howto-scala` useful even though the example code is written in Scala. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduled periodic message send will be restarted with the actor on restarts. + This also means that the time period that elapses between two tick messages during a restart may drift + off based on when you restart the scheduled message sends relative to the time that the last message was + sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``. + +.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-receive + Template Pattern ================ @@ -33,4 +64,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. - diff --git a/akka-docs/rst/java/logging.rst b/akka-docs/rst/java/logging.rst index eefce2b35d..0f857837c5 100644 --- a/akka-docs/rst/java/logging.rst +++ b/akka-docs/rst/java/logging.rst @@ -194,8 +194,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4 ch.qos.logback logback-classic - 1.0.4 - runtime + 1.0.7 You need to enable the Slf4jEventHandler in the 'event-handlers' element in diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index 685a0903d5..2ee8bc397f 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -431,13 +431,20 @@ defaults to a 'dead-letter' actor ref. getSender().tell(result); // will have dead-letter actor as default } -Initial receive timeout -======================= +Receive timeout +=============== -A timeout mechanism can be used to receive a message when no initial message is -received within a certain time. To receive this timeout you have to set the -``receiveTimeout`` property and declare handing for the ReceiveTimeout -message. +The `UntypedActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which +the sending of a `ReceiveTimeout` message is triggered. +When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message. +1 millisecond is the minimum supported timeout. + +Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after +another message was enqueued; hence it is **not guaranteed** that upon reception of the receive +timeout there must have been an idle period beforehand as configured via this method. + +Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity +periods). Pass in `Duration.Undefined` to switch off this feature. .. includecode:: code/docs/actor/MyReceivedTimeoutUntypedActor.java#receive-timeout diff --git a/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst index ae59cf2d7c..bf2a7baf73 100644 --- a/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/rst/project/migration-guide-2.0.x-2.1.x.rst @@ -203,17 +203,17 @@ v2.0 Scala:: v2.1 Scala:: - val router2 = system.actorOf(Props[ExampleActor1].withRouter( - RoundRobinRouter(routees = routees))) + val router2 = system.actorOf(Props.empty.withRouter( + RoundRobinRouter(routees = routees))) v2.0 Java:: - ActorRef router2 = system.actorOf(new Props(ExampleActor.class).withRouter( + ActorRef router2 = system.actorOf(new Props().withRouter( RoundRobinRouter.create(routees))); v2.1 Java:: - ActorRef router2 = system.actorOf(new Props().withRouter( + ActorRef router2 = system.actorOf(Props.empty().withRouter( RoundRobinRouter.create(routees))); Props: Function-based creation diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index fea94dec0d..3000a2e55a 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -549,13 +549,20 @@ defaults to a 'dead-letter' actor ref. val result = process(request) sender ! result // will have dead-letter actor as default -Initial receive timeout -======================= +Receive timeout +=============== -A timeout mechanism can be used to receive a message when no initial message is -received within a certain time. To receive this timeout you have to set the -``receiveTimeout`` property and declare a case handing the ReceiveTimeout -object. +The `ActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which +the sending of a `ReceiveTimeout` message is triggered. +When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message. +1 millisecond is the minimum supported timeout. + +Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after +another message was enqueued; hence it is **not guaranteed** that upon reception of the receive +timeout there must have been an idle period beforehand as configured via this method. + +Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity +periods). Pass in `Duration.Undefined` to switch off this feature. .. includecode:: code/docs/actor/ActorDocSpec.scala#receive-timeout diff --git a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala index 15e542b264..611b7a43f5 100644 --- a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala @@ -270,10 +270,16 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { import akka.actor.ReceiveTimeout import scala.concurrent.duration._ class MyActor extends Actor { + // To set an initial delay context.setReceiveTimeout(30 milliseconds) def receive = { - case "Hello" ⇒ //... - case ReceiveTimeout ⇒ throw new RuntimeException("received timeout") + case "Hello" ⇒ + // To set in a response to a message + context.setReceiveTimeout(100 milliseconds) + case ReceiveTimeout ⇒ + // To turn it off + context.setReceiveTimeout(Duration.Undefined) + throw new RuntimeException("Receive timed out") } } //#receive-timeout diff --git a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala new file mode 100644 index 0000000000..fba8ed9ff9 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +import language.postfixOps + +import akka.actor.{ Props, ActorRef, Actor } +import scala.concurrent.duration._ +import akka.testkit.{ TimingTest, AkkaSpec, filterException } +import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor + +object SchedulerPatternSpec { + //#schedule-constructor + class ScheduleInConstructor extends Actor { + import context.dispatcher + val tick = + context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick") + //#schedule-constructor + // this var and constructor is declared here to not show up in the docs + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-constructor + + override def postStop() = tick.cancel() + + def receive = { + case "tick" ⇒ + // do something useful here + //#schedule-constructor + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-constructor + } + } + //#schedule-constructor + + //#schedule-receive + class ScheduleInReceive extends Actor { + import context._ + //#schedule-receive + // this var and constructor is declared here to not show up in the docs + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-receive + + override def preStart() = + system.scheduler.scheduleOnce(500 millis, self, "tick") + + // override postRestart so we don't call preStart and schedule a new message + override def postRestart(reason: Throwable) = {} + + def receive = { + case "tick" ⇒ + // send another periodic tick after the specified delay + system.scheduler.scheduleOnce(1000 millis, self, "tick") + // do something useful here + //#schedule-receive + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-receive + } + } + //#schedule-receive +} + +class SchedulerPatternSpec extends AkkaSpec { + + def testSchedule(actor: ActorRef, startDuration: FiniteDuration, + afterRestartDuration: FiniteDuration) = { + + filterException[ArithmeticException] { + within(startDuration) { + expectMsg("tick") + expectMsg("tick") + expectMsg("tick") + } + actor ! "restart" + within(afterRestartDuration) { + expectMsg("tick") + expectMsg("tick") + } + system.stop(actor) + } + } + + "send periodic ticks from the constructor" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2000 millis) + } + + "send ticks from the preStart and receive" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2500 millis) + } +} diff --git a/akka-docs/rst/scala/howto.rst b/akka-docs/rst/scala/howto.rst index 7d064e2491..dcdebe06db 100644 --- a/akka-docs/rst/scala/howto.rst +++ b/akka-docs/rst/scala/howto.rst @@ -111,6 +111,37 @@ This is where the Spider pattern comes in." The pattern is described `Discovering Message Flows in Actor System with the Spider Pattern `_. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduled periodic message send will be restarted with the actor on restarts. + This also means that the time period that elapses between two tick messages during a restart may drift + off based on when you restart the scheduled message sends relative to the time that the last message was + sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-receive + Template Pattern ================ @@ -127,4 +158,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. - diff --git a/akka-docs/rst/scala/logging.rst b/akka-docs/rst/scala/logging.rst index 60cd3f2a61..f8c3e11f27 100644 --- a/akka-docs/rst/scala/logging.rst +++ b/akka-docs/rst/scala/logging.rst @@ -232,7 +232,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4 .. code-block:: scala - lazy val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "runtime" + lazy val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" You need to enable the Slf4jEventHandler in the 'event-handlers' element in diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala index 51a189b7f9..6905b9b116 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala @@ -93,7 +93,7 @@ object LogRoleReplace extends ClipboardOwner { class LogRoleReplace { private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started with address \[akka://.*@([\w\-\.]+):([0-9]+)\]""".r - private val ColorCode = """\[[0-9]+m""" + private val ColorCode = """\u001B?\[[0-9]+m""" private var replacements: Map[String, String] = Map.empty diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 0c15039c06..5a3490b86d 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -65,7 +65,7 @@ object AkkaBuild extends Build { generatePdf in Sphinx <<= generatePdf in Sphinx in LocalProject(docs.id) map identity ), - aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, /*zeroMQ,*/ kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib) + aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib) ) lazy val actor = Project( @@ -215,15 +215,15 @@ object AkkaBuild extends Build { ) ) -// lazy val zeroMQ = Project( -// id = "akka-zeromq", -// base = file("akka-zeromq"), -// dependencies = Seq(actor, testkit % "test;test->test"), -// settings = defaultSettings ++ OSGi.zeroMQ ++ Seq( -// libraryDependencies ++= Dependencies.zeroMQ, -// previousArtifact := akkaPreviousArtifact("akka-zeromq") -// ) -// ) + lazy val zeroMQ = Project( + id = "akka-zeromq", + base = file("akka-zeromq"), + dependencies = Seq(actor, testkit % "test;test->test"), + settings = defaultSettings ++ OSGi.zeroMQ ++ Seq( + libraryDependencies ++= Dependencies.zeroMQ, + previousArtifact := akkaPreviousArtifact("akka-zeromq") + ) + ) lazy val kernel = Project( id = "akka-kernel", @@ -351,7 +351,7 @@ object AkkaBuild extends Build { id = "akka-docs", base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", - remote, cluster, slf4j, agent, dataflow, transactor, fileMailbox, /*zeroMQ,*/ camel, osgi, osgiAries), + remote, cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries), settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( sourceDirectory in Sphinx <<= baseDirectory / "rst", sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" }, @@ -648,7 +648,45 @@ object AkkaBuild extends Build { // Dependencies object Dependencies { - import Dependency._ + + object Compile { + // Compile + val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 + + val config = "com.typesafe" % "config" % "0.6.0" // ApacheV2 + val netty = "io.netty" % "netty" % "3.5.8.Final" // ApacheV2 + val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD + val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) + + val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.2" // MIT + val zeroMQClient = "org.zeromq" % "zeromq-scala-binding_2.10.0-M7" % "0.0.6" // ApacheV2 + val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 + val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2 + val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 + + + // Camel Sample + val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2 + + // Test + + object Test { + val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2 + val commonsIo = "commons-io" % "commons-io" % "2.0.1" % "test" // ApacheV2 + val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0 + val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" % "test" // EPL 1.0 / LGPL 2.1 + val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT + val scalatest = "org.scalatest" % "scalatest" % "1.8" % "test" cross CrossVersion.full // ApacheV2 + val scalacheck = "org.scalacheck" % "scalacheck" % "1.10.0" % "test" cross CrossVersion.full // New BSD + val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2 + val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2 + val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2 + val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 + val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT + } + } + + import Compile._ val actor = Seq(config) @@ -693,38 +731,3 @@ object Dependencies { val multiNodeSample = Seq(Test.scalatest) } -object Dependency { - // Compile - val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 - val config = "com.typesafe" % "config" % "0.5.2" // ApacheV2 - val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2 - val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD - val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) - - val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT - val zeroMQClient = "org.zeromq" % "zeromq-scala-binding" % "0.0.6" cross CrossVersion.full // ApacheV2 - val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2 - val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2 - val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 - - - // Camel Sample - val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2 - - // Test - - object Test { - val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2 - val commonsIo = "commons-io" % "commons-io" % "2.0.1" % "test" // ApacheV2 - val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0 - val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "test" // EPL 1.0 / LGPL 2.1 - val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT - val scalatest = "org.scalatest" % "scalatest" % "1.8" % "test" cross CrossVersion.full // ApacheV2 - val scalacheck = "org.scalacheck" % "scalacheck" % "1.10.0" % "test" cross CrossVersion.full // New BSD - val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2 - val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2 - val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2 - val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2 - val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT - } -}