Merge remote-tracking branch 'origin/master' into wip-2.10.0-RC1-∂π

- currently cheating: uses zeroMQ artifacts for scala 2.10M7
- fixed a bunch of more wrong references to scala.concurrent.util
This commit is contained in:
Roland 2012-10-15 16:18:52 +02:00
commit bff79c2f94
47 changed files with 1036 additions and 287 deletions

2
.gitignore vendored
View file

@ -7,6 +7,7 @@ project/plugins/project
project/boot/*
*/project/build/target
*/project/boot
*/project/project.target.config-classes
lib_managed
etags
tags
@ -67,3 +68,4 @@ redis/
beanstalk/
.scalastyle
bin/
.worksheet

View file

@ -1,57 +1,91 @@
#Contributing to Akka#
Greetings traveller!
##Infrastructure##
* [Akka Contributor License Agreement](http://www.typesafe.com/contribute/cla)
* [Akka Issue Tracker](http://doc.akka.io/docs/akka/current/project/issue-tracking.html)
* [Scalariform](https://github.com/mdr/scalariform)
##Workflow##
# Typesafe Project & Developer Guidelines
0. Sign the Akka Contributor License Agreement,
we won't accept anything from anybody who has not signed it.
1. Find-or-create a ticket in the issue tracker
2. Assign that ticket to yourself
3. Create a local branch with the following name format: wip-X-Y-Z
where the X is the number of the ticket in the tracker,
and Y is some brief keywords of the ticket title and Z is your initials or similar.
Example: wip-2373-add-contributing-md-√
4. Do what needs to be done (with tests and docs if applicable).
Your branch should pass all tests before going any further.
5. Push the branch to your clone of the Akka repository
6. Create a Pull Request onto the applicable Akka branch,
if the number of commits are more than a few, please squash the
commits first.
7. Change the status of your ticket to "Test"
8. The Pull Request will be reviewed by the Akka committers
9. Modify the Pull Request as agreed upon during the review,
then push the changes to your branch in your Akka repository,
the Pull Request should be automatically updated with the new
content.
10. Several cycles of review-then-change might occur.
11. Pull Request is either merged by the Akka committers,
or rejected, and the associated ticket will be updated to
reflect that.
12. Delete the local and remote wip-X-Y-Z
These guidelines are meant to be a living document that should be changed and adapted as needed. We encourage changes that makes it easier to achieve our goals in an efficient way.
##Code Reviews##
These guidelines mainly applies to Typesafes “mature” projects - not necessarily to projects of the type collection of scripts etc.
Akka utilizes peer code reviews to streamline the codebase, reduce the defect ratio,
increase maintainability and spread knowledge about how things are solved.
## General Workflow
Core review values:
This is the process for committing code into master. There are of course exceptions to these rules, for example minor changes to comments and documentation, fixing a broken build etc.
* Rule: [The Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule)
- Why: Small improvements add up over time, keeping the codebase in shape.
* Rule: [Don't Repeat Yourself](http://programmer.97things.oreilly.com/wiki/index.php/Don't_Repeat_Yourself)
- Why: Repetitions are not maintainable, keeping things DRY makes it easier to fix bugs and refactor,
since you only need to apply the correction in one place, or perform the refactoring at one place.
* Rule: Feature tests > Integration tests > Unit tests
- Why: Without proving that a feature works, the code is only liability.
Without proving that a feature works with other features, the code is of limited value.
Without proving the individual parts of a feature works, the code is harder to debug.
1. Make sure you have signed the [Typesafe CLA](http://www.typesafe.com/contribute/cla), if not, sign it online.
2. Before starting to work on a feature or a fix, you have to make sure that:
1. There is a ticket for your work in the project's issue tracker. If not, create it first.
2. The ticket has been scheduled for the current milestone.
3. The ticket is estimated by the team.
4. The ticket have been discussed and prioritized by the team.
3. You should always perform your work in a Git feature branch. The branch should be given a descriptive name that explains its intent. Some teams also like adding the ticket number and/or the [GitHub](http://github.com) user ID to the branch name, these details is up to each of the individual teams.
4. When the feature or fix is completed you should open a [Pull Request](https://help.github.com/articles/using-pull-requests) on GitHub.
5. The Pull Request should be reviewed by other maintainers (as many as feasible/practical). Note that the maintainers can consist of outside contributors, both within and outside Typesafe. Outside contributors (for example from EPFL or independent committers) are encouraged to participate in the review process, it is not a closed process.
6. After the review you should fix the issues as needed (pushing a new commit for new review etc.), iterating until the reviewers give their thumbs up.
7. Once the code has passed review the Pull Request can be merged into the master branch.
## Pull Request Requirements
For a Pull Request to be considered at all it has to meet these requirements:
1. Live up to the current code standard:
- Not violate [DRY](http://programmer.97things.oreilly.com/wiki/index.php/Don%27t_Repeat_Yourself).
- [Boy Scout Rule](http://programmer.97things.oreilly.com/wiki/index.php/The_Boy_Scout_Rule) needs to have been applied.
2. Regardless if the code introduces new features or fixes bugs or regressions, it must have comprehensive tests.
3. The code must be well documented in the Typesafe's standard documentation format (see the Documentation section below).
If these requirements are not met then the code should **not** be merged into master, or even reviewed - regardless of how good or important it is. No exceptions.
## Continuous Integration
Each project should be configured to use a continuous integration (CI) tool (i.e. a build server ala Jenkins). Typesafe has a Jenkins server farm that can be used. The CI tool should, on each push to master, build the **full** distribution and run **all** tests, and if something fails it should email out a notification with the failure report to the committer and the core team. The CI tool should also be used in conjunction with Typesafes Pull Request Validator (discussed below).
## Documentation
All documentation should be generated using the sbt-site-plugin, *or* publish artifacts to a repository that can be consumed by the typesafe stack.
All documentation must abide by the following maxims:
- Example code should be run as part of an automated test suite.
- Version should be **programmatically** specifiable to the build.
- Generation should be **completely automated** and available for scripting.
- Artifacts that must be included in the Typesafe Stack should be published to a maven “documentation” repository as documentation artifacts.
All documentation is preferred to be in Typesafe's standard documentation format [reStructuredText](http://doc.akka.io/docs/akka/snapshot/dev/documentation.html) compiled using Typesafe's customized [Sphinx](http://sphinx.pocoo.org/) based documentation generation system, which among other things allows all code in the documentation to be externalized into compiled files and imported into the documentation.
For more info, or for a starting point for new projects, look at the [Typesafe Documentation Sample project](https://github.com/typesafehub/doc-example).
For larger projects that have invested a lot of time and resources into their current documentation and samples scheme (like for example Play), it is understandable that it will take some time to migrate to this new model. In these cases someone from the project needs to take the responsibility of manual QA and verifier for the documentation and samples.
## Work In Progress
It is ok to work on a public feature branch in the GitHub repository. Something that can sometimes be useful for early feedback etc. If so then it is preferable to name the branch accordingly. This can be done by either prefix the name with ``wip-`` as in Work In Progress, or use hierarchical names like ``wip/..``, ``feature/..`` or ``topic/..``. Either way is fine as long as it is clear that it is work in progress and not ready for merge. This work can temporarily have a lower standard. However, to be merged into master it will have to go through the regular process outlined above, with Pull Request, review etc..
Also, to facilitate both well-formed commits and working together, the ``wip`` and ``feature``/``topic`` identifiers also have special meaning. Any branch labelled with ``wip`` is considered “git-unstable” and may be rebased and have its history rewritten. Any branch with ``feature``/``topic`` in the name is considered “stable” enough for others to depend on when a group is working on a feature.
## Creating Commits And Writing Commit Messages
Follow these guidelines when creating public commits and writing commit messages.
1. If your work spans multiple local commits (for example; if you do safe point commits while working in a feature branch or work in a branch for long time doing merges/rebases etc.) then please do not commit it all but rewrite the history by squashing the commits into a single big commit which you write a good commit message for (like discussed in the following sections). For more info read this article: [Git Workflow](http://sandofsky.com/blog/git-workflow.html). Every commit should be able to be used in isolation, cherry picked etc.
2. First line should be a descriptive sentence what the commit is doing. It should be possible to fully understand what the commit does by just reading this single line. It is **not ok** to only list the ticket number, type "minor fix" or similar. Include reference to ticket number, prefixed with #, at the end of the first line. If the commit is a small fix, then you are done. If not, go to 3.
3. Following the single line description should be a blank line followed by an enumerated list with the details of the commit.
4. Add keywords for your commit (depending on the degree of automation we reach, the list may change over time):
* ``Review by @gituser`` - if you want to notify someone on the team. The others can, and are encouraged to participate.
* ``Fix/Fixing/Fixes/Close/Closing/Refs #ticket`` - if you want to mark the ticket as fixed in the issue tracker (Assembla understands this).
* ``backport to _branch name_`` - if the fix needs to be cherry-picked to another branch (like 2.9.x, 2.10.x, etc)
Example:
Added monadic API to Future. Fixes #2731
* Details 1
* Details 2
* Details 3
##Source style##

View file

@ -607,8 +607,7 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory
protected def actorFactory: ActorRefFactory = system
protected def typedActor = this
val serialization = SerializationExtension(system)
val settings = system.settings
import system.settings
/**
* Default timeout for typed actor methods with non-void return type
@ -635,22 +634,17 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory
private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ActorRef): R = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val classLoader: ClassLoader = if (props.loader.nonEmpty) props.loader.get else props.interfaces.headOption.map(_.getClassLoader).orNull //If we have no loader, we arbitrarily take the loader of the first interface
val proxy = Proxy.newProxyInstance(
classLoader,
(props.loader orElse props.interfaces.collectFirst { case any any.getClassLoader }).orNull, //If we have no loader, we arbitrarily take the loader of the first interface
props.interfaces.toArray,
new TypedActorInvocationHandler(
this,
actorVar,
if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R]
new TypedActorInvocationHandler(this, actorVar, props.timeout getOrElse DefaultReturnTimeout)).asInstanceOf[R]
proxyVar match {
case null
actorVar.set(actorRef)
if (proxyVar eq null) {
actorVar set actorRef
proxy
case _
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
actorVar.set(actorRef) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
} else {
proxyVar set proxy // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
actorVar set actorRef //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
}

View file

@ -34,8 +34,8 @@ object CircuitBreaker {
*
* @param scheduler Reference to Akka scheduler
* @param maxFailures Maximum number of failures before opening the circuit
* @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
* @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
*/
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(syncExecutionContext)
@ -48,8 +48,8 @@ object CircuitBreaker {
*
* @param scheduler Reference to Akka scheduler
* @param maxFailures Maximum number of failures before opening the circuit
* @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
* @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
*/
def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
apply(scheduler, maxFailures, callTimeout, resetTimeout)
@ -71,8 +71,8 @@ object CircuitBreaker {
*
* @param scheduler Reference to Akka scheduler
* @param maxFailures Maximum number of failures before opening the circuit
* @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
* @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
* @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
*/
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
@ -457,7 +457,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
/**
* Calculate remaining timeout to inform the caller in case a backoff algorithm is useful
*
* @return [[akka.util.Deadline]] to when the breaker will attempt a reset by transitioning to half-open
* @return [[scala.concurrent.duration.Deadline]] to when the breaker will attempt a reset by transitioning to half-open
*/
private def remainingTimeout(): Deadline = get match {
case 0L Deadline.now

View file

@ -18,7 +18,7 @@ import java.util.Arrays
* hash, i.e. make sure it is different for different nodes.
*
*/
class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) {
class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) {
import ConsistentHash._

View file

@ -181,7 +181,7 @@ private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) ex
}
/**
* For internal use only. Converts Strings to [[scala.concurrent.util.Duration]]
* For internal use only. Converts Strings to [[scala.concurrent.duration.Duration]]
*/
private[camel] object DurationTypeConverter extends TypeConverterSupport {

View file

@ -78,6 +78,10 @@ akka {
# how often should the node send out heartbeats?
heartbeat-interval = 1s
# Number of member nodes that each member will send heartbeat messages to,
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 5
# defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high

View file

@ -60,7 +60,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
val selfAddress = system.provider match {
val selfAddress: Address = system.provider match {
case c: ClusterActorRefProvider c.transport.address
case other throw new ConfigurationException(
"ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]".
@ -72,7 +72,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
log.info("Cluster Node [{}] - is starting up...", selfAddress)
val failureDetector = {
val failureDetector: FailureDetector = {
import settings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({

View file

@ -153,8 +153,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
withDispatcher(context.props.dispatcher), name = "publisher")
val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
withDispatcher(context.props.dispatcher), name = "core")
context.actorOf(Props[ClusterHeartbeatDaemon].
withDispatcher(context.props.dispatcher), name = "heartbeat")
context.actorOf(Props[ClusterHeartbeatReceiver].
withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
withDispatcher(context.props.dispatcher), name = "metrics")
@ -170,26 +170,24 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging {
import ClusterLeaderAction._
import InternalClusterAction._
import ClusterHeartbeatSender._
import ClusterHeartbeatSender.JoinInProgress
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.settings._
val vclockNode = VectorClock.Node(selfAddress.toString)
val selfHeartbeat = Heartbeat(selfAddress)
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip()
var joinInProgress: Map[Address, Deadline] = Map.empty
var stats = ClusterStats()
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props[ClusterCoreSender].
withDispatcher(UseDispatcher), name = "coreSender")
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
import context.dispatcher
@ -197,10 +195,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(GossipInterval),
GossipInterval, self, GossipTick)
// start periodic heartbeat to all nodes in cluster
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval),
HeartbeatInterval, self, HeartbeatTick)
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask = scheduler.schedule(PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval),
UnreachableNodesReaperInterval, self, ReapUnreachableTick)
@ -221,7 +215,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
override def postStop(): Unit = {
gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
publishStatsTask foreach { _.cancel() }
@ -239,7 +232,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats()
@ -282,12 +274,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
val localGossip = latestGossip
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip()
joinInProgress = Map(address -> (Deadline.now + JoinTimeout))
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
publish(localGossip)
heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout)
context.become(initialized)
if (address == selfAddress)
@ -506,12 +498,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer
else remoteGossip // remote gossip is newer
val newJoinInProgress =
if (joinInProgress.isEmpty) joinInProgress
else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address)
latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).foreach {
@ -733,27 +720,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
}
def heartbeat(): Unit = {
removeOverdueJoinInProgress()
val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys
val deadline = Deadline.now + HeartbeatInterval
beatTo.foreach { address if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) }
}
/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
}
/**
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
def reapUnreachableMembers(): Unit = {
if (!isSingletonCluster && isAvailable) {
// only scrutinize if we are a non-singleton cluster and available

View file

@ -5,29 +5,47 @@ package akka.cluster
import language.postfixOps
import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props }
import java.security.MessageDigest
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import scala.collection.immutable.SortedSet
import scala.annotation.tailrec
import scala.concurrent.duration._
import java.net.URLEncoder
import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props }
import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException }
import akka.cluster.ClusterEvent._
import akka.routing.ConsistentHash
/**
* INTERNAL API
*/
private[akka] object ClusterHeartbeatReceiver {
/**
* Sent at regular intervals for failure detection.
*/
case class Heartbeat(from: Address) extends ClusterMessage
/**
* Tell failure detector at receiving side that it should
* remove the monitoring, because heartbeats will end from
* this node.
*/
case class EndHeartbeat(from: Address) extends ClusterMessage
}
/**
* INTERNAL API.
*
* Receives Heartbeat messages and delegates to Cluster.
* Receives Heartbeat messages and updates failure detector.
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
* to Cluster message after message, but concurrent with other types of messages.
*/
private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging {
private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging {
import ClusterHeartbeatReceiver._
val failureDetector = Cluster(context.system).failureDetector
def receive = {
case Heartbeat(from) failureDetector heartbeat from
case EndHeartbeat(from) failureDetector remove from
}
}
@ -37,69 +55,271 @@ private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogg
*/
private[cluster] object ClusterHeartbeatSender {
/**
*
* Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]]
* to the other node.
* Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of
* another node and heartbeats should be sent unconditionally until it becomes
* member or deadline is overdue. This is done to be able to detect immediate death
* of the joining node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
case class JoinInProgress(address: Address, deadline: Deadline)
}
/*
* INTERNAL API
*
* This actor is responsible for sending the heartbeat messages to
* other nodes. Netty blocks when sending to broken connections. This actor
* isolates sending to different nodes by using child workers for each target
* a few other nodes that will monitor this node.
*
* Netty blocks when sending to broken connections. This actor
* isolates sending to different nodes by using child actors for each target
* address and thereby reduce the risk of irregular heartbeats to healty
* nodes due to broken connections to other nodes.
*/
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
import ClusterHeartbeatSender._
import ClusterHeartbeatSenderConnection._
import ClusterHeartbeatReceiver._
import InternalClusterAction.HeartbeatTick
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler }
import cluster.settings._
import context.dispatcher
val selfHeartbeat = Heartbeat(selfAddress)
val selfEndHeartbeat = EndHeartbeat(selfAddress)
var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor),
selfAddress.toString, MonitoredByNrOfMembers)
// start periodic heartbeat to other nodes in cluster
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration],
HeartbeatInterval, self, HeartbeatTick)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
override def postStop(): Unit = {
heartbeatTask.cancel()
cluster.unsubscribe(self)
}
/**
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat")
val digester = MessageDigest.getInstance("MD5")
/**
* Child name is MD5 hash of the address.
* FIXME Change to URLEncode when ticket #2123 has been fixed
*/
def encodeChildName(name: String): String = {
digester update name.getBytes("UTF-8")
digester.digest.map { h "%02x".format(0xFF & h) }.mkString
}
context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver")
def receive = {
case msg @ SendHeartbeat(from, to, deadline)
val workerName = encodeChildName(to.toString)
val worker = context.actorFor(workerName) match {
case HeartbeatTick heartbeat()
case s: CurrentClusterState reset(s)
case MemberUnreachable(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) addJoinInProgress(a, d)
}
def reset(snapshot: CurrentClusterState): Unit =
state = state.reset(snapshot.members.collect { case m if m.address != selfAddress m.address })
def addMember(m: Member): Unit = if (m.address != selfAddress)
state = state addMember m.address
def removeMember(m: Member): Unit = if (m.address != selfAddress)
state = state removeMember m.address
def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress)
state = state.addJoinInProgress(address, deadline)
def heartbeat(): Unit = {
state = state.removeOverdueJoinInProgress()
def connection(to: Address): ActorRef = {
// URL encoded target address as child actor name
val connectionName = URLEncoder.encode(to.toString, "UTF-8")
context.actorFor(connectionName) match {
case notFound if notFound.isTerminated
context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName)
context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName)
case child child
}
worker ! msg
}
val deadline = Deadline.now + HeartbeatInterval
state.active foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
// When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
// sent to notify it that no more heartbeats will be sent.
for ((to, count) state.ending) {
val c = connection(to)
c ! SendEndHeartbeat(selfEndHeartbeat, to)
if (count == NumberOfEndHeartbeats) {
state = state.removeEnding(to)
c ! PoisonPill
} else
state = state.increaseEndingCount(to)
}
}
}
/**
* Responsible for sending [[akka.cluster.Heartbeat]] to one specific address.
* INTERNAL API
*/
private[cluster] object ClusterHeartbeatSenderState {
/**
* Initial, empty state
*/
def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String,
monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers)
/**
* Create a new state based on previous state, and
* keep track of which nodes to stop sending heartbeats to.
*/
private def apply(
old: ClusterHeartbeatSenderState,
consistentHash: ConsistentHash[Address],
all: Set[Address]): ClusterHeartbeatSenderState = {
/**
* Select a few peers that heartbeats will be sent to, i.e. that will
* monitor this node. Try to send heartbeats to same nodes as much
* as possible, but re-balance with consistent hashing algorithm when
* new members are added or removed.
*/
def selectPeers: Set[Address] = {
val allSize = all.size
val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers)
// try more if consistentHash results in same node as already selected
val attemptLimit = nrOfPeers * 2
@tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
if (acc.size == nrOfPeers || n == attemptLimit) acc
else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1)
}
if (nrOfPeers >= allSize) all
else select(Set.empty[Address], 0)
}
val curr = selectPeers
// start ending process for nodes not selected any more
// abort ending process for nodes that have been selected again
val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end)
}
}
/**
* INTERNAL API
*
* Netty blocks when sending to broken connections, and this actor uses
* a configurable circuit breaker to reduce connect attempts to broken
* State used by [akka.cluster.ClusterHeartbeatSender].
* The initial state is created with `empty` in the of
* the companion object, thereafter the state is modified
* with the methods, such as `addMember`. It is immutable,
* i.e. the methods return new instances.
*/
private[cluster] case class ClusterHeartbeatSenderState private (
consistentHash: ConsistentHash[Address],
selfAddressStr: String,
monitoredByNrOfMembers: Int,
all: Set[Address] = Set.empty,
current: Set[Address] = Set.empty,
ending: Map[Address, Int] = Map.empty,
joinInProgress: Map[Address, Deadline] = Map.empty) {
// FIXME can be disabled as optimization
assertInvariants
private def assertInvariants: Unit = {
val currentAndEnding = current.intersect(ending.keySet)
require(currentAndEnding.isEmpty,
"Same nodes in current and ending not allowed, got [%s]" format currentAndEnding)
val joinInProgressAndAll = joinInProgress.keySet.intersect(all)
require(joinInProgressAndAll.isEmpty,
"Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll)
val currentNotInAll = current -- all
require(currentNotInAll.isEmpty,
"Nodes in current but not in all not allowed, got [%s]" format currentNotInAll)
require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]"
format all)
}
val active: Set[Address] = current ++ joinInProgress.keySet
def reset(nodes: Set[Address]): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ },
consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
all = nodes)
def addMember(a: Address): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a)
def removeMember(a: Address): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a)
private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
if (joinInProgress contains address)
copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0))
else this
}
def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
if (all contains address) this
else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address)
}
/**
* Cleanup overdue joinInProgress, in case a joining node never
* became member, for some reason.
*/
def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = {
val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
if (overdue.isEmpty) this
else
copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue)
}
def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a)
def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1)))
}
/**
* INTERNAL API
*/
private[cluster] object ClusterHeartbeatSenderConnection {
import ClusterHeartbeatReceiver._
/**
* Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
* [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
/**
* Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send
* [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node.
* Local only, no need to serialize.
*/
case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address)
}
/**
* Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]]
* and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address.
*
* This actor exists only because Netty blocks when sending to broken connections,
* and this actor uses a configurable circuit breaker to reduce connect attempts to broken
* connections.
*
* @see ClusterHeartbeatSender
* @see akka.cluster.ClusterHeartbeatSender
*/
private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef)
extends Actor with ActorLogging {
import ClusterHeartbeatSender._
import ClusterHeartbeatSenderConnection._
val breaker = {
val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings
@ -110,21 +330,19 @@ private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
}
// make sure it will cleanup when not used any more
context.setReceiveTimeout(30 seconds)
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
if (!deadline.isOverdue) {
// the CircuitBreaker will measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
// Netty blocks when sending to broken connections, the CircuitBreaker will
// measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
toRef ! heartbeatMsg
if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
}
case ReceiveTimeout context.stop(self) // cleanup when not used
if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
case SendEndHeartbeat(endHeartbeatMsg, _)
log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef)
toRef ! endHeartbeatMsg
}
}

View file

@ -16,14 +16,34 @@ import scala.concurrent.duration.FiniteDuration
class ClusterSettings(val config: Config, val systemName: String) {
import config._
final val FailureDetectorThreshold = getDouble("akka.cluster.failure-detector.threshold")
final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
final val FailureDetectorImplementationClass = getString("akka.cluster.failure-detector.implementation-class")
final val FailureDetectorMinStdDeviation: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
final val FailureDetectorThreshold: Double = {
val x = getDouble("akka.cluster.failure-detector.threshold")
require(x > 0.0, "failure-detector.threshold must be > 0")
x
}
final val FailureDetectorMaxSampleSize: Int = {
val n = getInt("akka.cluster.failure-detector.max-sample-size")
require(n > 0, "failure-detector.max-sample-size must be > 0"); n
}
final val FailureDetectorImplementationClass: String = getString("akka.cluster.failure-detector.implementation-class")
final val FailureDetectorMinStdDeviation: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
require(d > Duration.Zero, "failure-detector.min-std-deviation must be > 0"); d
}
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
require(d >= Duration.Zero, "failure-detector.acceptable-heartbeat-pause must be >= 0"); d
}
final val HeartbeatInterval: FiniteDuration = {
val d = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
require(d > Duration.Zero, "failure-detector.heartbeat-interval must be > 0"); d
}
final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration
final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt
final val MonitoredByNrOfMembers: Int = {
val n = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members")
require(n > 0, "failure-detector.monitored-by-nr-of-members must be > 0"); n
}
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case AddressFromURIString(addr) addr

View file

@ -196,7 +196,7 @@ private[akka] class ClusterRouteeProvider(
private def selectDeploymentTarget: Option[Address] = {
val currentRoutees = routees
val currentNodes = availbleNodes
val currentNodes = availableNodes
if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) {
None
} else {
@ -222,7 +222,7 @@ private[akka] class ClusterRouteeProvider(
case a a
}
private[routing] def availbleNodes: SortedSet[Address] = {
private[routing] def availableNodes: SortedSet[Address] = {
import Member.addressOrdering
val currentNodes = nodes
if (currentNodes.isEmpty && settings.allowLocalRoutees)
@ -236,11 +236,11 @@ private[akka] class ClusterRouteeProvider(
private[routing] var nodes: SortedSet[Address] = {
import Member.addressOrdering
cluster.readView.members.collect {
case m if isAvailble(m) m.address
case m if isAvailable(m) m.address
}
}
private[routing] def isAvailble(m: Member): Boolean = {
private[routing] def isAvailable(m: Member): Boolean = {
m.status == MemberStatus.Up && (settings.allowLocalRoutees || m.address != cluster.selfAddress)
}
@ -271,10 +271,10 @@ private[akka] class ClusterRouterActor extends Router {
override def routerReceive: Receive = {
case s: CurrentClusterState
import Member.addressOrdering
routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailble(m) m.address }
routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailable(m) m.address }
routeeProvider.createRoutees()
case m: MemberEvent if routeeProvider.isAvailble(m.member)
case m: MemberEvent if routeeProvider.isAvailable(m.member)
routeeProvider.nodes += m.member.address
// createRoutees will create routees based on
// totalInstances and maxInstancesPerNode

View file

@ -39,7 +39,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
gossip-interval = 500 ms
auto-join = off
auto-down = on
failure-detector.acceptable-heartbeat-pause = 10s
failure-detector.acceptable-heartbeat-pause = 5s
publish-stats-interval = 0 s # always, when it happens
}
akka.event-handlers = ["akka.testkit.TestEventListener"]
@ -54,7 +54,9 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
akka.scheduler.tick-duration = 33 ms
akka.remote.log-remote-lifecycle-events = off
akka.remote.netty.execution-pool-size = 4
#akka.remote.netty.reconnection-time-window = 1s
#akka.remote.netty.reconnection-time-window = 10s
akka.remote.netty.read-timeout = 5s
akka.remote.netty.write-timeout = 5s
akka.remote.netty.backoff-timeout = 500ms
akka.remote.netty.connection-timeout = 500ms

View file

@ -4,10 +4,12 @@
package akka.cluster
import language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.duration._
case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
val controller = role("controller")
@ -69,7 +71,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
val leaderAddress = address(leader)
enterBarrier("before-shutdown" + n)
testConductor.shutdown(leader, 0)
enterBarrier("after-shutdown" + n, "after-down" + n, "completed" + n)
enterBarrier("after-shutdown" + n, "after-unavailable" + n, "after-down" + n, "completed" + n)
case `leader`
enterBarrier("before-shutdown" + n, "after-shutdown" + n)
@ -78,15 +80,25 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
case `aUser`
val leaderAddress = address(leader)
enterBarrier("before-shutdown" + n, "after-shutdown" + n)
// detect failure
markNodeAsUnavailable(leaderAddress)
awaitCond(clusterView.unreachableMembers.exists(m m.address == leaderAddress))
enterBarrier("after-unavailable" + n)
// user marks the shutdown leader as DOWN
cluster.down(leaderAddress)
enterBarrier("after-down" + n, "completed" + n)
markNodeAsUnavailable(leaderAddress)
case _ if remainingRoles.contains(myself)
// remaining cluster nodes, not shutdown
enterBarrier("before-shutdown" + n, "after-shutdown" + n, "after-down" + n)
val leaderAddress = address(leader)
enterBarrier("before-shutdown" + n, "after-shutdown" + n)
awaitCond(clusterView.unreachableMembers.exists(m m.address == leaderAddress))
enterBarrier("after-unavailable" + n)
enterBarrier("after-down" + n)
awaitUpConvergence(currentRoles.size - 1)
val nextExpectedLeader = remainingRoles.head
clusterView.isLeader must be(myself == nextExpectedLeader)
@ -97,12 +109,12 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
}
}
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in within(20 seconds) {
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0)
enterBarrier("after-2")
}
"be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in {
"be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in within(20 seconds) {
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1)
enterBarrier("after-3")
}

View file

@ -28,6 +28,8 @@ class ClusterConfigSpec extends AkkaSpec {
PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second)
HeartbeatInterval must be(1 second)
NumberOfEndHeartbeats must be(4)
MonitoredByNrOfMembers must be(5)
LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second)
PublishStatsInterval must be(10 second)

View file

@ -0,0 +1,107 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import akka.routing.ConsistentHash
import scala.concurrent.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
val selfAddress = Address("akka", "sys", "myself", 2552)
val aa = Address("akka", "sys", "aa", 2552)
val bb = Address("akka", "sys", "bb", 2552)
val cc = Address("akka", "sys", "cc", 2552)
val dd = Address("akka", "sys", "dd", 2552)
val ee = Address("akka", "sys", "ee", 2552)
val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10),
selfAddress.toString, 3)
"A ClusterHeartbeatSenderState" must {
"return empty active set when no nodes" in {
emptyState.active.isEmpty must be(true)
}
"include joinInProgress in active set" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds)
s.joinInProgress.keySet must be(Set(aa))
s.active must be(Set(aa))
}
"remove joinInProgress from active set after removeOverdueJoinInProgress" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress()
s.joinInProgress must be(Map.empty)
s.active must be(Set.empty)
s.ending must be(Map(aa -> 0))
}
"remove joinInProgress after reset" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb))
s.joinInProgress must be(Map.empty)
}
"remove joinInProgress after addMember" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa)
s.joinInProgress must be(Map.empty)
}
"remove joinInProgress after removeMember" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa)
s.joinInProgress must be(Map.empty)
s.ending must be(Map(aa -> 0))
}
"remove from ending after addJoinInProgress" in {
val s = emptyState.reset(Set(aa, bb)).removeMember(aa)
s.ending must be(Map(aa -> 0))
val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds)
s2.joinInProgress.keySet must be(Set(aa))
s2.ending must be(Map.empty)
}
"include nodes from reset in active set" in {
val nodes = Set(aa, bb, cc)
val s = emptyState.reset(nodes)
s.all must be(nodes)
s.current must be(nodes)
s.ending must be(Map.empty)
s.active must be(nodes)
}
"limit current nodes to monitoredByNrOfMembers when adding members" in {
val nodes = Set(aa, bb, cc, dd)
val s = nodes.foldLeft(emptyState) { _ addMember _ }
s.all must be(nodes)
s.current.size must be(3)
s.addMember(ee).current.size must be(3)
}
"move meber to ending set when removing member" in {
val nodes = Set(aa, bb, cc, dd, ee)
val s = emptyState.reset(nodes)
s.ending must be(Map.empty)
val included = s.current.head
val s2 = s.removeMember(included)
s2.ending must be(Map(included -> 0))
s2.current must not contain (included)
val s3 = s2.addMember(included)
s3.current must contain(included)
s3.ending.keySet must not contain (included)
}
"increase ending count correctly" in {
val s = emptyState.reset(Set(aa)).removeMember(aa)
s.ending must be(Map(aa -> 0))
val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa)
s2.ending must be(Map(aa -> 2))
}
}
}

View file

@ -8,7 +8,7 @@ package docs.circuitbreaker;
import akka.actor.UntypedActor;
import scala.concurrent.Future;
import akka.event.LoggingAdapter;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.pattern.CircuitBreaker;
import akka.event.Logging;

View file

@ -5,15 +5,15 @@
package docs.duration;
//#import
import scala.concurrent.util.Duration;
import scala.concurrent.util.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Deadline;
//#import
class Java {
public void demo() {
//#dsl
final Duration fivesec = Duration.create(5, "seconds");
final Duration threemillis = Duration.parse("3 millis");
final Duration threemillis = Duration.create("3 millis");
final Duration diff = fivesec.minus(threemillis);
assert diff.lt(fivesec);
assert Duration.Zero().lt(Duration.Inf());

View file

@ -3,6 +3,8 @@
Developer Guidelines
====================
First read: `The Akka Contributor Guidelines <https://github.com/akka/akka/blob/master/CONTRIBUTING.md>`_ .
Code Style
----------

View file

@ -189,6 +189,11 @@ external resource, which may also be one of its own children. If a third party
terminates a child by way of the ``system.stop(child)`` method or sending a
:class:`PoisonPill`, the supervisor might well be affected.
..warning::
DeathWatch for Akka Remote does not (yet) get triggered by connection failures.
This feature may be added in a future release of Akka Remoting.
One-For-One Strategy vs. All-For-One Strategy
---------------------------------------------

View file

@ -14,7 +14,7 @@ import akka.actor.Terminated;
import akka.actor.UntypedActor;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe;
@ -41,7 +41,7 @@ public class FaultHandlingTestBase {
//#strategy
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.parse("1 minute"),
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
@ -81,7 +81,7 @@ public class FaultHandlingTestBase {
//#strategy2
private static SupervisorStrategy strategy = new OneForOneStrategy(10,
Duration.parse("1 minute"),
Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {

View file

@ -6,18 +6,23 @@ package docs.actor;
//#receive-timeout
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
// To set an initial delay
getContext().setReceiveTimeout(Duration.create("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
// To set in a response to a message
getContext().setReceiveTimeout(Duration.create("10 seconds"));
getSender().tell("Hello world", getSelf());
} else if (message == ReceiveTimeout.getInstance()) {
// To turn it off
getContext().setReceiveTimeout(Duration.Undefined());
throw new RuntimeException("received timeout");
} else {
unhandled(message);

View file

@ -5,7 +5,7 @@ package docs.actor;
//#imports1
import akka.actor.Props;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
//#imports1

View file

@ -11,7 +11,7 @@ import akka.japi.*;
import akka.dispatch.Futures;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
//#imports

View file

@ -14,7 +14,7 @@ import scala.concurrent.Future;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
//#import-future
@ -35,7 +35,7 @@ import akka.actor.Terminated;
import static akka.pattern.Patterns.gracefulStop;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.pattern.AskTimeoutException;
//#import-gracefulStop
@ -44,7 +44,7 @@ import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import scala.concurrent.Future;
import akka.dispatch.Futures;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
@ -192,7 +192,7 @@ public class UntypedActorDocTestBase {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef myActor = system.actorOf(new Props(WatchActor.class));
Future<Object> future = Patterns.ask(myActor, "kill", 1000);
assert Await.result(future, Duration.parse("1 second")).equals("finished");
assert Await.result(future, Duration.create("1 second")).equals("finished");
system.shutdown();
}

View file

@ -13,7 +13,7 @@ import java.util.Map;
import akka.actor.*;
import akka.dispatch.Mapper;
import akka.japi.Function;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
import akka.event.Logging;
import akka.event.LoggingAdapter;
@ -62,7 +62,7 @@ public class FaultHandlingDocSample {
public void preStart() {
// If we don't get any progress within 15 seconds then the service
// is unavailable
getContext().setReceiveTimeout(Duration.parse("15 seconds"));
getContext().setReceiveTimeout(Duration.create("15 seconds"));
}
public void onReceive(Object msg) {
@ -237,7 +237,7 @@ public class FaultHandlingDocSample {
// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
private static SupervisorStrategy strategy = new OneForOneStrategy(3,
Duration.parse("5 seconds"), new Function<Throwable, Directive>() {
Duration.create("5 seconds"), new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof StorageException) {

View file

@ -8,8 +8,8 @@ package docs.camel;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.util.Timeout;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import static java.util.concurrent.TimeUnit.SECONDS;
//#CamelActivation

View file

@ -2,8 +2,8 @@ package docs.camel;
//#Consumer4
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;

View file

@ -9,7 +9,7 @@ import akka.actor.AbstractExtensionId;
import akka.actor.ExtensionIdProvider;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit;

View file

@ -12,7 +12,7 @@ import akka.util.Timeout;
//#imports1
//#imports2
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.japi.Function;
import java.util.concurrent.Callable;
import static akka.dispatch.Futures.future;
@ -113,7 +113,7 @@ public class FutureDocTestBase {
return "Hello" + "World";
}
}, system.dispatcher());
String result = (String) Await.result(f, Duration.create(1, SECONDS));
String result = (String) Await.result(f, Duration.create(5, SECONDS));
//#future-eval
assertEquals("HelloWorld", result);
}
@ -135,7 +135,7 @@ public class FutureDocTestBase {
}
}, ec);
int result = Await.result(f2, Duration.create(1, SECONDS));
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
//#map
}
@ -159,7 +159,7 @@ public class FutureDocTestBase {
}, ec);
//#map2
int result = Await.result(f2, Duration.create(1, SECONDS));
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
}
@ -183,7 +183,7 @@ public class FutureDocTestBase {
}, ec);
//#map3
int result = Await.result(f2, Duration.create(1, SECONDS));
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
}
@ -209,7 +209,7 @@ public class FutureDocTestBase {
}, ec);
//#flat-map
int result = Await.result(f2, Duration.create(1, SECONDS));
int result = Await.result(f2, Duration.create(5, SECONDS));
assertEquals(10, result);
}
@ -238,7 +238,7 @@ public class FutureDocTestBase {
}
}, ec);
long result = Await.result(futureSum, Duration.create(1, SECONDS));
long result = Await.result(futureSum, Duration.create(5, SECONDS));
//#sequence
assertEquals(3L, result);
}
@ -262,7 +262,7 @@ public class FutureDocTestBase {
}, ec);
//Returns the sequence of strings as upper case
Iterable<String> result = Await.result(futureResult, Duration.create(1, SECONDS));
Iterable<String> result = Await.result(futureResult, Duration.create(5, SECONDS));
assertEquals(Arrays.asList("A", "B", "C"), result);
//#traverse
}
@ -286,7 +286,7 @@ public class FutureDocTestBase {
return r + t; //Just concatenate
}
}, ec);
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
String result = Await.result(resultFuture, Duration.create(5, SECONDS));
//#fold
assertEquals("ab", result);
@ -310,7 +310,7 @@ public class FutureDocTestBase {
}
}, ec);
Object result = Await.result(resultFuture, Duration.create(1, SECONDS));
Object result = Await.result(resultFuture, Duration.create(5, SECONDS));
//#reduce
assertEquals("ab", result);
@ -326,10 +326,10 @@ public class FutureDocTestBase {
Future<String> otherFuture = Futures.failed(
new IllegalArgumentException("Bang!"));
//#failed
Object result = Await.result(future, Duration.create(1, SECONDS));
Object result = Await.result(future, Duration.create(5, SECONDS));
assertEquals("Yay!", result);
Throwable result2 = Await.result(otherFuture.failed(),
Duration.create(1, SECONDS));
Duration.create(5, SECONDS));
assertEquals("Bang!", result2.getMessage());
}
@ -399,7 +399,7 @@ public class FutureDocTestBase {
throw problem;
}
}, ec);
int result = Await.result(future, Duration.create(1, SECONDS));
int result = Await.result(future, Duration.create(5, SECONDS));
assertEquals(result, 0);
//#recover
}
@ -425,7 +425,7 @@ public class FutureDocTestBase {
throw problem;
}
}, ec);
int result = Await.result(future, Duration.create(1, SECONDS));
int result = Await.result(future, Duration.create(5, SECONDS));
assertEquals(result, 0);
//#try-recover
}
@ -497,7 +497,7 @@ public class FutureDocTestBase {
}
}, ec);
String result = Await.result(future3, Duration.create(1, SECONDS));
String result = Await.result(future3, Duration.create(5, SECONDS));
assertEquals("foo bar", result);
//#zip
}
@ -509,7 +509,7 @@ public class FutureDocTestBase {
Future<String> future3 = Futures.successful("bar");
// Will have "bar" in this case
Future<String> future4 = future1.fallbackTo(future2).fallbackTo(future3);
String result = Await.result(future4, Duration.create(1, SECONDS));
String result = Await.result(future4, Duration.create(5, SECONDS));
assertEquals("bar", result);
//#fallback-to
}

View file

@ -19,7 +19,7 @@ import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.OneForOneStrategy;
@ -68,7 +68,7 @@ public class CustomRouterDocTestBase {
public void demonstrateSupervisor() {
//#supervision
final SupervisorStrategy strategy =
new OneForOneStrategy(5, Duration.parse("1 minute"),
new OneForOneStrategy(5, Duration.create("1 minute"),
new Class<?>[] { Exception.class });
final ActorRef router = system.actorOf(new Props(MyActor.class)
.withRouter(new RoundRobinRouter(5).withSupervisorStrategy(strategy)));

View file

@ -11,7 +11,7 @@ import akka.routing.SmallestMailboxRouter;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.util.Timeout;
import scala.concurrent.Future;
import scala.concurrent.Await;

View file

@ -0,0 +1,191 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.pattern;
import akka.actor.*;
import akka.testkit.*;
import akka.testkit.TestEvent.Mute;
import akka.testkit.TestEvent.UnMute;
import org.junit.*;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class SchedulerPatternTest {
static ActorSystem system;
@BeforeClass
public static void setUp() {
system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf());
}
@AfterClass
public static void tearDown() {
system.shutdown();
}
static
//#schedule-constructor
public class ScheduleInConstructor extends UntypedActor {
private final Cancellable tick = getContext().system().scheduler().schedule(
Duration.create(500, TimeUnit.MILLISECONDS),
Duration.create(1000, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
//#schedule-constructor
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;
public ScheduleInConstructor(ActorRef target) {
this.target = target;
}
//#schedule-constructor
@Override
public void postStop() {
tick.cancel();
}
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("tick")) {
// do something useful here
//#schedule-constructor
target.tell(message, getSelf());
//#schedule-constructor
}
//#schedule-constructor
else if (message.equals("restart")) {
throw new ArithmeticException();
}
//#schedule-constructor
else {
unhandled(message);
}
}
}
//#schedule-constructor
static
//#schedule-receive
public class ScheduleInReceive extends UntypedActor {
//#schedule-receive
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;
public ScheduleInReceive(ActorRef target) {
this.target = target;
}
//#schedule-receive
@Override
public void preStart() {
getContext().system().scheduler().scheduleOnce(
Duration.create(500, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
}
// override postRestart so we don't call preStart and schedule a new message
@Override
public void postRestart(Throwable reason) {
}
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("tick")) {
// send another periodic tick after the specified delay
getContext().system().scheduler().scheduleOnce(
Duration.create(1000, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
// do something useful here
//#schedule-receive
target.tell(message, getSelf());
//#schedule-receive
}
//#schedule-receive
else if (message.equals("restart")) {
throw new ArithmeticException();
}
//#schedule-receive
else {
unhandled(message);
}
}
}
//#schedule-receive
@Test
@Ignore // no way to tag this as timing sensitive
public void scheduleInConstructor() {
new TestSchedule(system) {{
final JavaTestKit probe = new JavaTestKit(system);
final Props props = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ScheduleInConstructor(probe.getRef());
}
});
testSchedule(probe, props, duration("3000 millis"), duration("2000 millis"));
}};
}
@Test
@Ignore // no way to tag this as timing sensitive
public void scheduleInReceive() {
new TestSchedule(system) {{
final JavaTestKit probe = new JavaTestKit(system);
final Props props = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new ScheduleInReceive(probe.getRef());
}
});
testSchedule(probe, props, duration("3000 millis"), duration("2500 millis"));
}};
}
public static class TestSchedule extends JavaTestKit {
private ActorSystem system;
public TestSchedule(ActorSystem system) {
super(system);
this.system = system;
}
public void testSchedule(final JavaTestKit probe, Props props,
FiniteDuration startDuration,
FiniteDuration afterRestartDuration) {
Iterable<akka.testkit.EventFilter> filter =
Arrays.asList(new akka.testkit.EventFilter[]{
(akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)});
try {
system.eventStream().publish(new Mute(filter));
final ActorRef actor = system.actorOf(props);
new Within(startDuration) {
protected void run() {
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
}
};
actor.tell("restart", getRef());
new Within(afterRestartDuration) {
protected void run() {
probe.expectMsgEquals("tick");
probe.expectMsgEquals("tick");
}
};
system.stop(actor);
}
finally {
system.eventStream().publish(new UnMute(filter));
}
}
}
}

View file

@ -26,7 +26,7 @@ import akka.testkit.TestActor;
import akka.testkit.TestActor.AutoPilot;
import akka.testkit.TestActorRef;
import akka.testkit.JavaTestKit;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
public class TestKitDocTest {

View file

@ -14,7 +14,7 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.JavaTestKit;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
public class TestKitSampleTest {

View file

@ -30,7 +30,7 @@ import akka.actor.UntypedActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import scala.concurrent.util.Duration;
import scala.concurrent.duration.Duration;
import akka.serialization.SerializationExtension;
import akka.serialization.Serialization;
import java.io.Serializable;

View file

@ -17,6 +17,37 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library
You might find some of the patterns described in the Scala chapter of
:ref:`howto-scala` useful even though the example code is written in Scala.
Scheduling Periodic Messages
============================
This pattern describes how to schedule periodic messages to yourself in two different
ways.
The first way is to set up periodic message scheduling in the constructor of the actor,
and cancel that scheduled sending in ``postStop`` or else we might have multiple registered
message sends to the same actor.
.. note::
With this approach the scheduled periodic message send will be restarted with the actor on restarts.
This also means that the time period that elapses between two tick messages during a restart may drift
off based on when you restart the scheduled message sends relative to the time that the last message was
sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``.
.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-constructor
The second variant sets up an initial one shot message send in the ``preStart`` method
of the actor, and the then the actor when it receives this message sets up a new one shot
message send. You also have to override ``postRestart`` so we don't call ``preStart``
and schedule the initial message send again.
.. note::
With this approach we won't fill up the mailbox with tick messages if the actor is
under pressure, but only schedule a new tick message when we have seen the previous one.
.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-receive
Template Pattern
================
@ -33,4 +64,3 @@ This is an especially nice pattern, since it does even come with some empty exam
Spread the word: this is the easiest way to get famous!
Please keep this pattern at the end of this file.

View file

@ -194,8 +194,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.4</version>
<scope>runtime</scope>
<version>1.0.7</version>
</dependency>
You need to enable the Slf4jEventHandler in the 'event-handlers' element in

View file

@ -431,13 +431,20 @@ defaults to a 'dead-letter' actor ref.
getSender().tell(result); // will have dead-letter actor as default
}
Initial receive timeout
=======================
Receive timeout
===============
A timeout mechanism can be used to receive a message when no initial message is
received within a certain time. To receive this timeout you have to set the
``receiveTimeout`` property and declare handing for the ReceiveTimeout
message.
The `UntypedActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which
the sending of a `ReceiveTimeout` message is triggered.
When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message.
1 millisecond is the minimum supported timeout.
Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after
another message was enqueued; hence it is **not guaranteed** that upon reception of the receive
timeout there must have been an idle period beforehand as configured via this method.
Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity
periods). Pass in `Duration.Undefined` to switch off this feature.
.. includecode:: code/docs/actor/MyReceivedTimeoutUntypedActor.java#receive-timeout

View file

@ -203,17 +203,17 @@ v2.0 Scala::
v2.1 Scala::
val router2 = system.actorOf(Props[ExampleActor1].withRouter(
val router2 = system.actorOf(Props.empty.withRouter(
RoundRobinRouter(routees = routees)))
v2.0 Java::
ActorRef router2 = system.actorOf(new Props(ExampleActor.class).withRouter(
ActorRef router2 = system.actorOf(new Props().withRouter(
RoundRobinRouter.create(routees)));
v2.1 Java::
ActorRef router2 = system.actorOf(new Props().withRouter(
ActorRef router2 = system.actorOf(Props.empty().withRouter(
RoundRobinRouter.create(routees)));
Props: Function-based creation

View file

@ -549,13 +549,20 @@ defaults to a 'dead-letter' actor ref.
val result = process(request)
sender ! result // will have dead-letter actor as default
Initial receive timeout
=======================
Receive timeout
===============
A timeout mechanism can be used to receive a message when no initial message is
received within a certain time. To receive this timeout you have to set the
``receiveTimeout`` property and declare a case handing the ReceiveTimeout
object.
The `ActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which
the sending of a `ReceiveTimeout` message is triggered.
When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message.
1 millisecond is the minimum supported timeout.
Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after
another message was enqueued; hence it is **not guaranteed** that upon reception of the receive
timeout there must have been an idle period beforehand as configured via this method.
Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity
periods). Pass in `Duration.Undefined` to switch off this feature.
.. includecode:: code/docs/actor/ActorDocSpec.scala#receive-timeout

View file

@ -270,10 +270,16 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
import akka.actor.ReceiveTimeout
import scala.concurrent.duration._
class MyActor extends Actor {
// To set an initial delay
context.setReceiveTimeout(30 milliseconds)
def receive = {
case "Hello" //...
case ReceiveTimeout throw new RuntimeException("received timeout")
case "Hello"
// To set in a response to a message
context.setReceiveTimeout(100 milliseconds)
case ReceiveTimeout
// To turn it off
context.setReceiveTimeout(Duration.Undefined)
throw new RuntimeException("Receive timed out")
}
}
//#receive-timeout

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.pattern
import language.postfixOps
import akka.actor.{ Props, ActorRef, Actor }
import scala.concurrent.duration._
import akka.testkit.{ TimingTest, AkkaSpec, filterException }
import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor
object SchedulerPatternSpec {
//#schedule-constructor
class ScheduleInConstructor extends Actor {
import context.dispatcher
val tick =
context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
//#schedule-constructor
// this var and constructor is declared here to not show up in the docs
var target: ActorRef = null
def this(target: ActorRef) = { this(); this.target = target }
//#schedule-constructor
override def postStop() = tick.cancel()
def receive = {
case "tick"
// do something useful here
//#schedule-constructor
target ! "tick"
case "restart"
throw new ArithmeticException
//#schedule-constructor
}
}
//#schedule-constructor
//#schedule-receive
class ScheduleInReceive extends Actor {
import context._
//#schedule-receive
// this var and constructor is declared here to not show up in the docs
var target: ActorRef = null
def this(target: ActorRef) = { this(); this.target = target }
//#schedule-receive
override def preStart() =
system.scheduler.scheduleOnce(500 millis, self, "tick")
// override postRestart so we don't call preStart and schedule a new message
override def postRestart(reason: Throwable) = {}
def receive = {
case "tick"
// send another periodic tick after the specified delay
system.scheduler.scheduleOnce(1000 millis, self, "tick")
// do something useful here
//#schedule-receive
target ! "tick"
case "restart"
throw new ArithmeticException
//#schedule-receive
}
}
//#schedule-receive
}
class SchedulerPatternSpec extends AkkaSpec {
def testSchedule(actor: ActorRef, startDuration: FiniteDuration,
afterRestartDuration: FiniteDuration) = {
filterException[ArithmeticException] {
within(startDuration) {
expectMsg("tick")
expectMsg("tick")
expectMsg("tick")
}
actor ! "restart"
within(afterRestartDuration) {
expectMsg("tick")
expectMsg("tick")
}
system.stop(actor)
}
}
"send periodic ticks from the constructor" taggedAs TimingTest in {
testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))),
3000 millis, 2000 millis)
}
"send ticks from the preStart and receive" taggedAs TimingTest in {
testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))),
3000 millis, 2500 millis)
}
}

View file

@ -111,6 +111,37 @@ This is where the Spider pattern comes in."
The pattern is described `Discovering Message Flows in Actor System with the Spider Pattern <http://letitcrash.com/post/30585282971/discovering-message-flows-in-actor-systems-with-the>`_.
Scheduling Periodic Messages
============================
This pattern describes how to schedule periodic messages to yourself in two different
ways.
The first way is to set up periodic message scheduling in the constructor of the actor,
and cancel that scheduled sending in ``postStop`` or else we might have multiple registered
message sends to the same actor.
.. note::
With this approach the scheduled periodic message send will be restarted with the actor on restarts.
This also means that the time period that elapses between two tick messages during a restart may drift
off based on when you restart the scheduled message sends relative to the time that the last message was
sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``.
.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor
The second variant sets up an initial one shot message send in the ``preStart`` method
of the actor, and the then the actor when it receives this message sets up a new one shot
message send. You also have to override ``postRestart`` so we don't call ``preStart``
and schedule the initial message send again.
.. note::
With this approach we won't fill up the mailbox with tick messages if the actor is
under pressure, but only schedule a new tick message when we have seen the previous one.
.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-receive
Template Pattern
================
@ -127,4 +158,3 @@ This is an especially nice pattern, since it does even come with some empty exam
Spread the word: this is the easiest way to get famous!
Please keep this pattern at the end of this file.

View file

@ -232,7 +232,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4
.. code-block:: scala
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "runtime"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.0.7"
You need to enable the Slf4jEventHandler in the 'event-handlers' element in

View file

@ -93,7 +93,7 @@ object LogRoleReplace extends ClipboardOwner {
class LogRoleReplace {
private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started with address \[akka://.*@([\w\-\.]+):([0-9]+)\]""".r
private val ColorCode = """\[[0-9]+m"""
private val ColorCode = """\u001B?\[[0-9]+m"""
private var replacements: Map[String, String] = Map.empty

View file

@ -65,7 +65,7 @@ object AkkaBuild extends Build {
generatePdf in Sphinx <<= generatePdf in Sphinx in LocalProject(docs.id) map identity
),
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, /*zeroMQ,*/ kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib)
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib)
)
lazy val actor = Project(
@ -215,15 +215,15 @@ object AkkaBuild extends Build {
)
)
// lazy val zeroMQ = Project(
// id = "akka-zeromq",
// base = file("akka-zeromq"),
// dependencies = Seq(actor, testkit % "test;test->test"),
// settings = defaultSettings ++ OSGi.zeroMQ ++ Seq(
// libraryDependencies ++= Dependencies.zeroMQ,
// previousArtifact := akkaPreviousArtifact("akka-zeromq")
// )
// )
lazy val zeroMQ = Project(
id = "akka-zeromq",
base = file("akka-zeromq"),
dependencies = Seq(actor, testkit % "test;test->test"),
settings = defaultSettings ++ OSGi.zeroMQ ++ Seq(
libraryDependencies ++= Dependencies.zeroMQ,
previousArtifact := akkaPreviousArtifact("akka-zeromq")
)
)
lazy val kernel = Project(
id = "akka-kernel",
@ -351,7 +351,7 @@ object AkkaBuild extends Build {
id = "akka-docs",
base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test",
remote, cluster, slf4j, agent, dataflow, transactor, fileMailbox, /*zeroMQ,*/ camel, osgi, osgiAries),
remote, cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries),
settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
sourceDirectory in Sphinx <<= baseDirectory / "rst",
sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" },
@ -648,7 +648,45 @@ object AkkaBuild extends Build {
// Dependencies
object Dependencies {
import Dependency._
object Compile {
// Compile
val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2
val config = "com.typesafe" % "config" % "0.6.0" // ApacheV2
val netty = "io.netty" % "netty" % "3.5.8.Final" // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD
val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala)
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.2" // MIT
val zeroMQClient = "org.zeromq" % "zeromq-scala-binding_2.10.0-M7" % "0.0.6" // ApacheV2
val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2
val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2
val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2
// Camel Sample
val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2
// Test
object Test {
val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2
val commonsIo = "commons-io" % "commons-io" % "2.0.1" % "test" // ApacheV2
val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0
val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" % "test" // EPL 1.0 / LGPL 2.1
val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT
val scalatest = "org.scalatest" % "scalatest" % "1.8" % "test" cross CrossVersion.full // ApacheV2
val scalacheck = "org.scalacheck" % "scalacheck" % "1.10.0" % "test" cross CrossVersion.full // New BSD
val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2
val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2
val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2
val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2
val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT
}
}
import Compile._
val actor = Seq(config)
@ -693,38 +731,3 @@ object Dependencies {
val multiNodeSample = Seq(Test.scalatest)
}
object Dependency {
// Compile
val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2
val config = "com.typesafe" % "config" % "0.5.2" // ApacheV2
val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD
val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala)
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT
val zeroMQClient = "org.zeromq" % "zeromq-scala-binding" % "0.0.6" cross CrossVersion.full // ApacheV2
val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2
val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2
val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2
// Camel Sample
val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2
// Test
object Test {
val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2
val commonsIo = "commons-io" % "commons-io" % "2.0.1" % "test" // ApacheV2
val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0
val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "test" // EPL 1.0 / LGPL 2.1
val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT
val scalatest = "org.scalatest" % "scalatest" % "1.8" % "test" cross CrossVersion.full // ApacheV2
val scalacheck = "org.scalacheck" % "scalacheck" % "1.10.0" % "test" cross CrossVersion.full // New BSD
val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2
val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2
val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2
val log4j = "log4j" % "log4j" % "1.2.14" % "test" // ApacheV2
val junitIntf = "com.novocode" % "junit-interface" % "0.8" % "test" // MIT
}
}