Merge branch 'master' into wip-cluster-docs-patriknw

Conflicts:
	project/AkkaBuild.scala
This commit is contained in:
Patrik Nordwall 2012-09-20 10:40:08 +02:00
commit 9423d37da9
162 changed files with 2244 additions and 1057 deletions

View file

@ -64,6 +64,7 @@ public class JavaExtension {
@Test
public void mustBeAccessible() {
assertTrue(system.hasExtension((TestExtensionId.TestExtensionProvider)));
assertSame(system.extension(TestExtensionId.TestExtensionProvider).system, system);
assertSame(TestExtensionId.TestExtensionProvider.apply(system).system, system);
}

View file

@ -70,7 +70,7 @@ object ActorSystemSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") with ImplicitSender {
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension"]""") with ImplicitSender {
"An ActorSystem" must {
@ -95,9 +95,10 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
}
"support extensions" in {
// TestExtension is configured and should be loaded at startup
system.hasExtension(TestExtension) must be(true)
TestExtension(system).system must be === system
system.extension(TestExtension).system must be === system
system.hasExtension(TestExtension) must be(true)
}
"run termination callbacks in order" in {

View file

@ -35,6 +35,9 @@ object DeployerSpec {
router = scatter-gather
within = 2 seconds
}
/service-consistent-hashing {
router = consistent-hashing
}
/service-resizer {
router = round-robin
resizer {
@ -118,6 +121,10 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
assertRouting("/service-scatter-gather", ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather")
}
"be able to parse 'akka.actor.deployment._' with consistent-hashing router" in {
assertRouting("/service-consistent-hashing", ConsistentHashingRouter(1), "/service-consistent-hashing")
}
"be able to parse 'akka.actor.deployment._' with router resizer" in {
val resizer = DefaultResizer()
assertRouting("/service-resizer", RoundRobinRouter(resizer = Some(resizer)), "/service-resizer")

View file

@ -5,7 +5,6 @@
package akka.actor
import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.testkit._
import TestEvent.Mute
@ -15,6 +14,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import akka.util.Timeout
import scala.concurrent.util.Duration
import scala.concurrent.util.FiniteDuration
object FSMActorSpec {
val timeout = Timeout(2 seconds)
@ -33,7 +33,7 @@ object FSMActorSpec {
case object Locked extends LockState
case object Open extends LockState
class Lock(code: String, timeout: Duration, latches: Latches) extends Actor with FSM[LockState, CodeState] {
class Lock(code: String, timeout: FiniteDuration, latches: Latches) extends Actor with FSM[LockState, CodeState] {
import latches._
@ -183,6 +183,10 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
"run onTermination upon ActorRef.stop()" in {
val started = TestLatch(1)
/*
* This lazy val trick is beyond evil: KIDS, DON'T TRY THIS AT HOME!
* It is necessary here because of the path-dependent type fsm.StopEvent.
*/
lazy val fsm = new Actor with FSM[Int, Null] {
override def preStart = { started.countDown }
startWith(1, null)

View file

@ -5,7 +5,6 @@
package akka.actor
import language.postfixOps
import akka.util.ByteString
import scala.concurrent.{ ExecutionContext, Await, Future, Promise }
import scala.concurrent.util.{ Duration, Deadline }
@ -17,6 +16,7 @@ import akka.pattern.ask
import java.net.{ Socket, InetSocketAddress, InetAddress, SocketAddress }
import scala.util.Failure
import annotation.tailrec
import scala.concurrent.util.FiniteDuration
object IOActorSpec {
@ -244,7 +244,10 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
* @param filter determines which exceptions should be retried
* @return a future containing the result or the last exception before a limit was hit.
*/
def retry[T](count: Option[Int] = None, timeout: Option[Duration] = None, delay: Option[Duration] = Some(100 millis), filter: Option[Throwable Boolean] = None)(future: Future[T])(implicit executor: ExecutionContext): Future[T] = {
def retry[T](count: Option[Int] = None,
timeout: Option[FiniteDuration] = None,
delay: Option[FiniteDuration] = Some(100 millis),
filter: Option[Throwable Boolean] = None)(future: Future[T])(implicit executor: ExecutionContext): Future[T] = {
val promise = Promise[T]()

View file

@ -5,13 +5,12 @@
package akka.actor
import language.postfixOps
import akka.testkit._
import scala.concurrent.util.duration._
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import java.util.concurrent.TimeoutException
import scala.concurrent.util.Duration
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ReceiveTimeoutSpec extends AkkaSpec {
@ -65,7 +64,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
case ReceiveTimeout
count.incrementAndGet
timeoutLatch.open
context.resetReceiveTimeout()
context.setReceiveTimeout(Duration.Undefined)
}
}))

View file

@ -4,7 +4,6 @@
package akka.actor
import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.Timeout
import scala.concurrent.{ Await, Future, Promise }
@ -21,6 +20,7 @@ import akka.serialization.JavaSerializer
import akka.actor.TypedActor._
import java.lang.IllegalStateException
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
import scala.concurrent.util.FiniteDuration
object TypedActorSpec {
@ -203,10 +203,10 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
def newFooBar: Foo = newFooBar(Duration(2, "s"))
def newFooBar(d: Duration): Foo =
def newFooBar(d: FiniteDuration): Foo =
TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)))
def newFooBar(dispatcher: String, d: Duration): Foo =
def newFooBar(dispatcher: String, d: FiniteDuration): Foo =
TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)).withDispatcher(dispatcher))
def newStacked(): Stacked =

View file

@ -43,6 +43,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getBoolean("akka.jvm-exit-on-fatal-error") must be(true)
settings.JvmExitOnFatalError must be(true)
getInt("akka.actor.deployment.default.virtual-nodes-factor") must be(10)
settings.DefaultVirtualNodesFactor must be(10)
}
{

View file

@ -68,16 +68,6 @@ class AskSpec extends AkkaSpec {
}.getMessage must be === expectedMsg
}
"return broken promises on infinite timeout" in {
implicit val timeout = Timeout.never
val echo = system.actorOf(Props(new Actor { def receive = { case x sender ! x } }))
val f = echo ? "foo"
val expectedMsg = "Timeouts to `ask` must be finite. Question not sent to [%s]" format echo
intercept[IllegalArgumentException] {
Await.result(f, remaining)
}.getMessage must be === expectedMsg
}
}
}

View file

@ -0,0 +1,104 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import scala.concurrent.Await
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.routing.ConsistentHashingRouter.ConsistentHashable
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.testkit.AkkaSpec
import akka.testkit._
object ConsistentHashingRouterSpec {
val config = """
akka.actor.deployment {
/router1 {
router = consistent-hashing
nr-of-instances = 3
virtual-nodes-factor = 17
}
/router2 {
router = consistent-hashing
nr-of-instances = 5
}
}
"""
class Echo extends Actor {
def receive = {
case _ sender ! self
}
}
case class Msg(key: Any, data: String) extends ConsistentHashable {
override def consistentHashKey = key
}
case class MsgKey(name: String)
case class Msg2(key: Any, data: String)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.config) with DefaultTimeout with ImplicitSender {
import akka.routing.ConsistentHashingRouterSpec._
implicit val ec = system.dispatcher
val router1 = system.actorOf(Props[Echo].withRouter(FromConfig()), "router1")
"consistent hashing router" must {
"create routees from configuration" in {
val currentRoutees = Await.result(router1 ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees]
currentRoutees.routees.size must be(3)
}
"select destination based on consistentHashKey of the message" in {
router1 ! Msg("a", "A")
val destinationA = expectMsgType[ActorRef]
router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
expectMsg(destinationA)
router1 ! Msg(17, "B")
val destinationB = expectMsgType[ActorRef]
router1 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17)
expectMsg(destinationB)
router1 ! Msg(MsgKey("c"), "C")
val destinationC = expectMsgType[ActorRef]
router1 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c"))
expectMsg(destinationC)
}
"select destination with defined consistentHashRoute" in {
def hashMapping: ConsistentHashMapping = {
case Msg2(key, data) key
}
val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter(
hashMapping = hashMapping)), "router2")
router2 ! Msg2("a", "A")
val destinationA = expectMsgType[ActorRef]
router2 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
expectMsg(destinationA)
router2 ! Msg2(17, "B")
val destinationB = expectMsgType[ActorRef]
router2 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17)
expectMsg(destinationB)
router2 ! Msg2(MsgKey("c"), "C")
val destinationC = expectMsgType[ActorRef]
router2 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c"))
expectMsg(destinationC)
}
}
}

View file

@ -4,7 +4,6 @@
package akka.routing
import language.postfixOps
import akka.actor.Actor
import akka.testkit._
import akka.actor.Props
@ -15,6 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.pattern.ask
import scala.concurrent.util.Duration
import java.util.concurrent.TimeoutException
import scala.concurrent.util.FiniteDuration
object ResizerSpec {
@ -174,8 +174,8 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
val router = system.actorOf(Props(new Actor {
def receive = {
case d: Duration Thread.sleep(d.dilated.toMillis); sender ! "done"
case "echo" sender ! "reply"
case d: FiniteDuration Thread.sleep(d.dilated.toMillis); sender ! "done"
case "echo" sender ! "reply"
}
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
@ -190,7 +190,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
routees(router) must be(3)
def loop(loops: Int, d: Duration) = {
def loop(loops: Int, d: FiniteDuration) = {
for (m 0 until loops) router ! d
for (m 0 until loops) expectMsg(d * 3, "done")
}

View file

@ -171,8 +171,6 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
expectMsgType[ActorKilledException]
//#supervision
val router2 = system.actorOf(Props.empty.withRouter(RoundRobinRouter(1).withSupervisorStrategy(escalator)))
router2 ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 2) intercept {

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell;
package akka.actor.dungeon;
import akka.actor.ActorCell;
import akka.util.Unsafe;
@ -14,9 +14,9 @@ final class AbstractActorCell {
static {
try {
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Dispatch$$_mailboxDoNotCallMeDirectly"));
childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_childrenRefsDoNotCallMeDirectly"));
nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_nextNameDoNotCallMeDirectly"));
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$dungeon$Dispatch$$_mailboxDoNotCallMeDirectly"));
childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$dungeon$Children$$_childrenRefsDoNotCallMeDirectly"));
nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$dungeon$Children$$_nextNameDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}

View file

@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import akka.event.LoggingAdapter;
import akka.util.Unsafe;
@ -241,7 +242,7 @@ public class HashedWheelTimer implements Timer {
return new HashedWheelTimeout(this, task, time);
}
public Timeout newTimeout(TimerTask task, Duration delay) {
public Timeout newTimeout(TimerTask task, FiniteDuration delay) {
final long currentTime = System.nanoTime();
if (task == null) {

View file

@ -15,9 +15,9 @@
*/
package akka.util.internal;
import scala.concurrent.util.Duration;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import scala.concurrent.util.FiniteDuration;
/**
* Schedules {@link TimerTask}s for one-time future execution in a background
@ -42,7 +42,7 @@ public interface Timer {
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
Timeout newTimeout(TimerTask task, Duration delay);
Timeout newTimeout(TimerTask task, FiniteDuration delay);
/**
* Releases all resources acquired by this {@link Timer} and cancels all

View file

@ -108,6 +108,9 @@ akka {
# within is the timeout used for routers containing future calls
within = 5 seconds
# number of virtual nodes per node for consistent-hashing router
virtual-nodes-factor = 10
routees {
# Alternatively to giving nr-of-instances you can specify the full
# paths of those actors which should be routed to. This setting takes

View file

@ -4,13 +4,13 @@
package akka.actor
import cell.ChildrenContainer.{ WaitingForChildren }
import java.io.{ ObjectOutputStream, NotSerializableException }
import scala.annotation.tailrec
import scala.collection.immutable.TreeSet
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
import akka.actor.cell.ChildrenContainer
import akka.actor.dungeon.ChildrenContainer
import akka.actor.dungeon.ChildrenContainer.WaitingForChildren
import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated }
import akka.event.Logging.{ LogEvent, Debug, Error }
import akka.japi.Procedure
@ -55,23 +55,25 @@ trait ActorContext extends ActorRefFactory {
def props: Props
/**
* Gets the current receive timeout
* Gets the current receive timeout.
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def receiveTimeout: Option[Duration]
def receiveTimeout: Duration
/**
* Defines the default timeout for an initial receive invocation.
* Defines the inactivity timeout after which the sending of a `ReceiveTimeout` message is triggered.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
* 1 millisecond is the minimum supported timeout.
*
* Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after
* another message was enqueued; hence it is '''not guaranteed''' that upon reception of the receive
* timeout there must have been an idle period beforehand as configured via this method.
*
* Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity
* periods). Pass in `Duration.Undefined` to switch off this feature.
*/
def setReceiveTimeout(timeout: Duration): Unit
/**
* Clears the receive timeout, i.e. deactivates this feature.
*/
def resetReceiveTimeout(): Unit
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Puts the behavior on top of the hotswap stack.
@ -290,11 +292,11 @@ private[akka] class ActorCell(
val props: Props,
val parent: InternalActorRef)
extends UntypedActorContext with Cell
with cell.ReceiveTimeout
with cell.Children
with cell.Dispatch
with cell.DeathWatch
with cell.FaultHandling {
with dungeon.ReceiveTimeout
with dungeon.Children
with dungeon.Dispatch
with dungeon.DeathWatch
with dungeon.FaultHandling {
import ActorCell._

View file

@ -17,7 +17,7 @@ import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.cell.ChildrenContainer
import akka.actor.dungeon.ChildrenContainer
import scala.concurrent.util.FiniteDuration
import util.{ Failure, Success }
@ -166,6 +166,8 @@ object ActorSystem {
final val Daemonicity: Boolean = getBoolean("akka.daemonic")
final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor")
if (ConfigVersion != Version)
throw new akka.ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")

View file

@ -143,8 +143,6 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
val nrOfInstances = deployment.getInt("nr-of-instances")
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
val router: RouterConfig = deployment.getString("router") match {
@ -152,8 +150,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, resizer)
case "smallest-mailbox" SmallestMailboxRouter(nrOfInstances, routees, resizer)
case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case "scatter-gather"
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case "consistent-hashing"
val vnodes = deployment.getInt("virtual-nodes-factor")
ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes)
case fqn
val args = Seq(classOf[Config] -> deployment)
dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({

View file

@ -85,7 +85,10 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces
getClassFor(fqcn) flatMap { c createInstanceFor(c, args) }
override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = {
getClassFor(fqcn) flatMap { c
val classTry =
if (fqcn.endsWith("$")) getClassFor(fqcn)
else getClassFor(fqcn + "$") recoverWith { case _ getClassFor(fqcn) }
classTry flatMap { c
Try {
val module = c.getDeclaredField("MODULE$")
module.setAccessible(true)

View file

@ -4,11 +4,11 @@
package akka.actor
import language.implicitConversions
import akka.util._
import scala.concurrent.util.Duration
import scala.collection.mutable
import akka.routing.{ Deafen, Listen, Listeners }
import scala.concurrent.util.FiniteDuration
object FSM {
@ -92,7 +92,7 @@ object FSM {
private val scheduler = context.system.scheduler
private implicit val executionContext = context.dispatcher
def schedule(actor: ActorRef, timeout: Duration): Unit =
def schedule(actor: ActorRef, timeout: FiniteDuration): Unit =
ref = Some(
if (repeat) scheduler.schedule(timeout, timeout, actor, this)
else scheduler.scheduleOnce(timeout, actor, this))
@ -121,15 +121,18 @@ object FSM {
* name, the state data, possibly custom timeout, stop reason and replies
* accumulated while processing the last message.
*/
case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
/**
* Modify state transition descriptor to include a state timeout for the
* next state. This timeout overrides any default timeout set for the next
* state.
*
* Use Duration.Inf to deactivate an existing timeout.
*/
def forMax(timeout: Duration): State[S, D] = {
copy(timeout = Some(timeout))
def forMax(timeout: Duration): State[S, D] = timeout match {
case f: FiniteDuration copy(timeout = Some(f))
case _ copy(timeout = None)
}
/**
@ -245,7 +248,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
type State = FSM.State[S, D]
type StateFunction = scala.PartialFunction[Event, State]
type Timeout = Option[Duration]
type Timeout = Option[FiniteDuration]
type TransitionHandler = PartialFunction[(S, S), Unit]
/*
@ -279,7 +282,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
* @param stateTimeout default state timeout for this state
* @param stateFunction partial function describing response to input
*/
final def when(stateName: S, stateTimeout: Duration = null)(stateFunction: StateFunction): Unit =
final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit =
register(stateName, stateFunction, Option(stateTimeout))
/**
@ -339,7 +342,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
* @param repeat send once if false, scheduleAtFixedRate if true
* @return current state descriptor
*/
final def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = {
final def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean): State = {
if (debugEvent)
log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg)
if (timers contains name) {

View file

@ -12,7 +12,7 @@ import scala.annotation.tailrec
import scala.collection.mutable.Queue
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.cell.ChildrenContainer
import akka.actor.dungeon.ChildrenContainer
import akka.dispatch.{ Envelope, Supervise, SystemMessage, Terminate }
import akka.event.Logging.Warning
import akka.util.Unsafe

View file

@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import akka.util.internal._
import concurrent.ExecutionContext
import scala.concurrent.util.FiniteDuration
//#scheduler
/**
@ -29,13 +30,13 @@ trait Scheduler {
* Schedules a message to be sent repeatedly with an initial delay and
* frequency. E.g. if you would like a message to be sent immediately and
* thereafter every 500ms you would set delay=Duration.Zero and
* frequency=Duration(500, TimeUnit.MILLISECONDS)
* interval=Duration(500, TimeUnit.MILLISECONDS)
*
* Java & Scala API
*/
def schedule(
initialDelay: Duration,
frequency: Duration,
initialDelay: FiniteDuration,
interval: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable
@ -43,23 +44,23 @@ trait Scheduler {
* Schedules a function to be run repeatedly with an initial delay and a
* frequency. E.g. if you would like the function to be run after 2 seconds
* and thereafter every 100ms you would set delay = Duration(2, TimeUnit.SECONDS)
* and frequency = Duration(100, TimeUnit.MILLISECONDS)
* and interval = Duration(100, TimeUnit.MILLISECONDS)
*
* Scala API
*/
def schedule(
initialDelay: Duration, frequency: Duration)(f: Unit)(implicit executor: ExecutionContext): Cancellable
initialDelay: FiniteDuration, interval: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a function to be run repeatedly with an initial delay and
* a frequency. E.g. if you would like the function to be run after 2
* seconds and thereafter every 100ms you would set delay = Duration(2,
* TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS)
* TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS)
*
* Java API
*/
def schedule(
initialDelay: Duration, frequency: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a Runnable to be run once with a delay, i.e. a time period that
@ -67,7 +68,7 @@ trait Scheduler {
*
* Java & Scala API
*/
def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a message to be sent once with a delay, i.e. a time period that has
@ -75,7 +76,7 @@ trait Scheduler {
*
* Java & Scala API
*/
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable
def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a function to be run once with a delay, i.e. a time period that has
@ -83,7 +84,7 @@ trait Scheduler {
*
* Scala API
*/
def scheduleOnce(delay: Duration)(f: Unit)(implicit executor: ExecutionContext): Cancellable
def scheduleOnce(delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable
}
//#scheduler
@ -120,8 +121,8 @@ trait Cancellable {
* returned from stop().
*/
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) extends Scheduler with Closeable {
override def schedule(initialDelay: Duration,
delay: Duration,
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable = {
val continuousCancellable = new ContinuousCancellable
@ -142,12 +143,12 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
initialDelay))
}
override def schedule(initialDelay: Duration,
delay: Duration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
schedule(initialDelay, delay, new Runnable { override def run = f })
override def schedule(initialDelay: Duration,
delay: Duration,
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = {
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
@ -163,20 +164,20 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
initialDelay))
}
override def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
new DefaultCancellable(
hashedWheelTimer.newTimeout(
new TimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) },
delay))
override def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable =
override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable =
scheduleOnce(delay, new Runnable { override def run = receiver ! message })
override def scheduleOnce(delay: Duration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
override def scheduleOnce(delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
scheduleOnce(delay, new Runnable { override def run = f })
private trait ContinuousScheduling { this: TimerTask
def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) {
def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) {
try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException } // stop recurring if timer is stopped
}
}

View file

@ -4,7 +4,6 @@
package akka.actor
import language.existentials
import akka.japi.{ Creator, Option JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.Timeout
@ -20,6 +19,7 @@ import scala.reflect.ClassTag
import akka.serialization.{ JavaSerializer, SerializationExtension }
import java.io.ObjectStreamException
import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.FiniteDuration
/**
* A TypedActorFactory is something that can created TypedActor instances.
@ -421,7 +421,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
/**
* INTERNAL USE ONLY
*/
private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: Duration) {
private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) {
@throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match {
case null throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value")
case some toTypedActorInvocationHandler(some)

View file

@ -1,54 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import ReceiveTimeout.emptyReceiveTimeoutData
import akka.actor.ActorCell
import akka.actor.ActorCell.emptyCancellable
import akka.actor.Cancellable
import scala.concurrent.util.Duration
private[akka] object ReceiveTimeout {
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable)
}
private[akka] trait ReceiveTimeout { this: ActorCell
import ReceiveTimeout._
import ActorCell._
private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData
final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match {
case Duration.Undefined None
case duration Some(duration)
}
final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined))
final def setReceiveTimeout(timeout: Duration): Unit =
receiveTimeoutData = (
if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout,
receiveTimeoutData._2)
final def resetReceiveTimeout(): Unit = setReceiveTimeout(None)
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) {
recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout)(this.dispatcher))
} else cancelReceiveTimeout()
}
final def cancelReceiveTimeout(): Unit =
if (receiveTimeoutData._2 ne emptyCancellable) {
receiveTimeoutData._2.cancel()
receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable)
}
}

View file

@ -129,10 +129,10 @@ trait Inbox { this: ActorDSL.type ⇒
val next = clientsByTimeout.head.deadline
import context.dispatcher
if (currentDeadline.isEmpty) {
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft.asInstanceOf[FiniteDuration], self, Kick)))
} else if (currentDeadline.get._1 != next) {
currentDeadline.get._2.cancel()
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft.asInstanceOf[FiniteDuration], self, Kick)))
}
}
}
@ -169,7 +169,7 @@ trait Inbox { this: ActorDSL.type ⇒
* this method within an actor!</b>
*/
def receive(timeout: FiniteDuration = defaultTimeout): Any = {
implicit val t = Timeout(timeout + extraTime)
implicit val t = Timeout((timeout + extraTime).asInstanceOf[FiniteDuration])
Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf)
}
@ -186,7 +186,7 @@ trait Inbox { this: ActorDSL.type ⇒
* this method within an actor!</b>
*/
def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = {
implicit val t = Timeout(timeout + extraTime)
implicit val t = Timeout((timeout + extraTime).asInstanceOf[FiniteDuration])
predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf))
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
package akka.actor.dungeon
import scala.annotation.tailrec
import scala.collection.JavaConverters.asJavaIterableConverter

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
package akka.actor.dungeon
import scala.collection.immutable.TreeMap

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
package akka.actor.dungeon
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated }
import akka.dispatch.{ Watch, Unwatch }

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
package akka.actor.dungeon
import scala.annotation.tailrec
import akka.actor.{ ActorRef, ActorCell }

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
package akka.actor.dungeon
import scala.annotation.tailrec
import akka.actor.{ PreRestartException, PostRestartException, InternalActorRef, Failed, ActorRef, ActorInterruptedException, ActorCell, Actor }
@ -16,6 +16,7 @@ import akka.actor.PreRestartException
import akka.actor.Failed
import akka.actor.PostRestartException
import akka.event.Logging.Debug
import scala.concurrent.util.Duration
private[akka] trait FaultHandling { this: ActorCell
@ -121,7 +122,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status)
assert(perpetrator == self)
setReceiveTimeout(None)
setReceiveTimeout(Duration.Undefined)
cancelReceiveTimeout
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
@ -137,7 +138,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
protected def terminate() {
setReceiveTimeout(None)
setReceiveTimeout(Duration.Undefined)
cancelReceiveTimeout
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.dungeon
import ReceiveTimeout.emptyReceiveTimeoutData
import akka.actor.ActorCell
import akka.actor.ActorCell.emptyCancellable
import akka.actor.Cancellable
import scala.concurrent.util.Duration
import scala.concurrent.util.FiniteDuration
private[akka] object ReceiveTimeout {
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable)
}
private[akka] trait ReceiveTimeout { this: ActorCell
import ReceiveTimeout._
import ActorCell._
private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData
final def receiveTimeout: Duration = receiveTimeoutData._1
final def setReceiveTimeout(timeout: Duration): Unit = receiveTimeoutData = receiveTimeoutData.copy(_1 = timeout)
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
//Only reschedule if desired and there are currently no more messages to be processed
if (!mailbox.hasMessages) recvtimeout._1 match {
case f: FiniteDuration
recvtimeout._2.cancel() //Cancel any ongoing future
val task = system.scheduler.scheduleOnce(f, self, akka.actor.ReceiveTimeout)(this.dispatcher)
receiveTimeoutData = (f, task)
case _ cancelReceiveTimeout()
}
else cancelReceiveTimeout()
}
final def cancelReceiveTimeout(): Unit =
if (receiveTimeoutData._2 ne emptyCancellable) {
receiveTimeoutData._2.cancel()
receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable)
}
}

View file

@ -16,6 +16,7 @@ import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool }
import scala.concurrent.util.Duration
import scala.concurrent.{ ExecutionContext, Await, Awaitable }
import scala.util.control.NonFatal
import scala.concurrent.util.FiniteDuration
final case class Envelope private (val message: Any, val sender: ActorRef)
@ -316,7 +317,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*
* INTERNAL API
*/
protected[akka] def shutdownTimeout: Duration
protected[akka] def shutdownTimeout: FiniteDuration
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference

View file

@ -11,6 +11,7 @@ import akka.util.Helpers
import java.util.{ Comparator, Iterator }
import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet }
import akka.actor.ActorSystemImpl
import scala.concurrent.util.FiniteDuration
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -33,7 +34,7 @@ class BalancingDispatcher(
throughputDeadlineTime: Duration,
mailboxType: MailboxType,
_executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
_shutdownTimeout: Duration,
_shutdownTimeout: FiniteDuration,
attemptTeamWork: Boolean)
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) {

View file

@ -12,6 +12,7 @@ import java.util.concurrent.{ ExecutorService, RejectedExecutionException }
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.util.Duration
import scala.concurrent.Awaitable
import scala.concurrent.util.FiniteDuration
/**
* The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
@ -32,7 +33,7 @@ class Dispatcher(
val throughputDeadlineTime: Duration,
val mailboxType: MailboxType,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
val shutdownTimeout: Duration)
val shutdownTimeout: FiniteDuration)
extends MessageDispatcher(_prerequisites) {
private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate {

View file

@ -6,6 +6,7 @@ package akka.dispatch
import akka.actor.ActorCell
import scala.concurrent.util.Duration
import scala.concurrent.util.FiniteDuration
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
@ -18,7 +19,7 @@ class PinnedDispatcher(
_actor: ActorCell,
_id: String,
_mailboxType: MailboxType,
_shutdownTimeout: Duration,
_shutdownTimeout: FiniteDuration,
_threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig())
extends Dispatcher(_prerequisites,
_id,

View file

@ -78,8 +78,7 @@ trait AskSupport {
actorRef.tell(message)
Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef))
case ref: InternalActorRef
if (!timeout.duration.isFinite) Future.failed[Any](new IllegalArgumentException("Timeouts to `ask` must be finite. Question not sent to [%s]" format actorRef))
else if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef))
if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef))
else {
val provider = ref.provider
val a = PromiseActorRef(provider, timeout)

View file

@ -10,7 +10,7 @@ import akka.util.Unsafe
import scala.util.control.NoStackTrace
import java.util.concurrent.{ Callable, CopyOnWriteArrayList }
import scala.concurrent.{ ExecutionContext, Future, Promise, Await }
import scala.concurrent.util.{ Duration, Deadline }
import scala.concurrent.util.{ FiniteDuration, Deadline }
import scala.concurrent.util.duration._
import scala.util.control.NonFatal
import scala.util.Success
@ -38,8 +38,8 @@ object CircuitBreaker {
* @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
*/
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker =
new CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(syncExecutionContext)
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(syncExecutionContext)
/**
* Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed
@ -52,8 +52,8 @@ object CircuitBreaker {
* @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
*/
def create(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker =
apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)
def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
apply(scheduler, maxFailures, callTimeout, resetTimeout)
}
/**
@ -76,9 +76,9 @@ object CircuitBreaker {
* @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit
* @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
*/
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) = {
def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = {
this(scheduler, maxFailures, callTimeout, resetTimeout)(executor)
}
@ -409,7 +409,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] =
if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)).future
if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(0.seconds)).future
/**
* Reset breaker on successful call.
@ -453,7 +453,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @return Future containing result of protected call
*/
override def invoke[T](body: Future[T]): Future[T] =
Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)).future
Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft.asInstanceOf[FiniteDuration])).future
/**
* Calculate remaining timeout to inform the caller in case a backoff algorithm is useful
@ -510,6 +510,6 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati
* @param message Defaults to "Circuit Breaker is open; calls are failing fast"
*/
class CircuitBreakerOpenException(
val remainingDuration: Duration,
val remainingDuration: FiniteDuration,
message: String = "Circuit Breaker is open; calls are failing fast")
extends AkkaException(message) with NoStackTrace

View file

@ -8,13 +8,14 @@ import scala.concurrent.util.Duration
import scala.concurrent.{ ExecutionContext, Promise, Future }
import akka.actor._
import scala.util.control.NonFatal
import scala.concurrent.util.FiniteDuration
trait FutureTimeoutSupport {
/**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: Duration, using: Scheduler)(value: Future[T])(implicit ec: ExecutionContext): Future[T] =
def after[T](duration: FiniteDuration, using: Scheduler)(value: Future[T])(implicit ec: ExecutionContext): Future[T] =
if (duration.isFinite() && duration.length < 1) {
try value catch { case NonFatal(t) Future.failed(t) }
} else {

View file

@ -10,6 +10,7 @@ import akka.dispatch.{ Unwatch, Watch }
import scala.concurrent.Future
import scala.concurrent.util.Duration
import scala.util.Success
import scala.concurrent.util.FiniteDuration
trait GracefulStopSupport {
/**
@ -36,7 +37,7 @@ trait GracefulStopSupport {
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
def gracefulStop(target: ActorRef, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) Future successful true
else system match {
case e: ExtendedActorSystem

View file

@ -6,6 +6,7 @@ package akka.pattern
import akka.actor.Scheduler
import scala.concurrent.ExecutionContext
import java.util.concurrent.Callable
import scala.concurrent.util.FiniteDuration
object Patterns {
import akka.actor.{ ActorRef, ActorSystem }
@ -103,20 +104,20 @@ object Patterns {
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] =
def gracefulStop(target: ActorRef, timeout: FiniteDuration, system: ActorSystem): Future[java.lang.Boolean] =
scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]
/**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration.
*/
def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] =
def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] =
scalaAfter(duration, scheduler)(value.call())(context)
/**
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value
* after the specified duration.
*/
def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] =
def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] =
scalaAfter(duration, scheduler)(value)(context)
}

View file

@ -1,61 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.actor._
/**
* An Iterable that also contains a version.
*/
trait VersionedIterable[A] {
def version: Long
def iterable: Iterable[A]
def apply(): Iterable[A] = iterable
}
/**
* Manages connections (ActorRefs) for a router.
*/
trait ConnectionManager {
/**
* A version that is useful to see if there is any change in the connections. If there is a change, a router is
* able to update its internal datastructures.
*/
def version: Long
/**
* Returns the number of 'available' connections. Value could be stale as soon as received, and this method can't be combined (easily)
* with an atomic read of and size and version.
*/
def size: Int
/**
* Returns if the number of 'available' is 0 or not. Value could be stale as soon as received, and this method can't be combined (easily)
* with an atomic read of and isEmpty and version.
*/
def isEmpty: Boolean
/**
* Shuts the connection manager down, which stops all managed actors
*/
def shutdown(): Unit
/**
* Returns a VersionedIterator containing all connected ActorRefs at some moment in time. Since there is
* the time element, also the version is included to be able to read the data (the connections) and the version
* in an atomic manner.
*
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
* view of some set of connections.
*/
def connections: VersionedIterable[ActorRef]
/**
* Removes a connection from the connection manager.
*
* @param ref the dead
*/
def remove(deadRef: ActorRef): Unit
}

View file

@ -4,255 +4,126 @@
package akka.routing
import scala.collection.immutable.{ TreeSet, Seq }
import scala.collection.mutable.{ Buffer, Map }
// =============================================================================================
// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license
// =============================================================================================
import scala.collection.immutable.SortedMap
import scala.reflect.ClassTag
import java.util.Arrays
/**
* Consistent Hashing node ring abstraction.
* Consistent Hashing node ring implementation.
*
* A good explanation of Consistent Hashing:
* http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html
*
* Note that toString of the ring nodes are used for the node
* hash, i.e. make sure it is different for different nodes.
*
* Not thread-safe, to be used from within an Actor or protected some other way.
*/
class ConsistentHash[T](nodes: Seq[T], replicas: Int) {
private val cluster = Buffer[T]()
private var sortedKeys = TreeSet[Long]()
private var ring = Map[Long, T]()
class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) {
nodes.foreach(this += _)
import ConsistentHash._
def +=(node: T): Unit = {
cluster += node
(1 to replicas) foreach { replica
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
ring += (key -> node)
sortedKeys = sortedKeys + key
}
if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1")
// arrays for fast binary search and access
// nodeHashRing is the sorted hash values of the nodes
// nodeRing is the nodes sorted in the same order as nodeHashRing, i.e. same index
private val (nodeHashRing: Array[Int], nodeRing: Array[T]) = {
val (nhr: Seq[Int], nr: Seq[T]) = nodes.toSeq.unzip
(nhr.toArray, nr.toArray)
}
def -=(node: T): Unit = {
cluster -= node
(1 to replicas) foreach { replica
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
ring -= key
sortedKeys = sortedKeys - key
}
}
/**
* Adds a node to the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
*/
def :+(node: T): ConsistentHash[T] =
new ConsistentHash(nodes ++ ((1 to virtualNodesFactor) map { r (nodeHashFor(node, r) -> node) }), virtualNodesFactor)
def nodeFor(key: Array[Byte]): T = {
val hash = hashFor(key)
if (sortedKeys contains hash) ring(hash)
/**
* Adds a node to the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
* JAVA API
*/
def add(node: T): ConsistentHash[T] = this :+ node
/**
* Removes a node from the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
*/
def :-(node: T): ConsistentHash[T] =
new ConsistentHash(nodes -- ((1 to virtualNodesFactor) map { r nodeHashFor(node, r) }), virtualNodesFactor)
/**
* Removes a node from the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
* JAVA API
*/
def remove(node: T): ConsistentHash[T] = this :- node
// converts the result of Arrays.binarySearch into a index in the nodeRing array
// see documentation of Arrays.binarySearch for what it returns
private def idx(i: Int): Int = {
if (i >= 0) i // exact match
else {
if (hash < sortedKeys.firstKey) ring(sortedKeys.firstKey)
else if (hash > sortedKeys.lastKey) ring(sortedKeys.lastKey)
else ring(sortedKeys.rangeImpl(None, Some(hash)).lastKey)
val j = math.abs(i + 1)
if (j >= nodeHashRing.length) 0 // after last, use first
else j // next node clockwise
}
}
private def hashFor(bytes: Array[Byte]): Long = {
val hash = MurmurHash.arrayHash(bytes)
if (hash == Int.MinValue) hash + 1
math.abs(hash)
}
}
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
/**
* An implementation of Austin Appleby's MurmurHash 3.0 algorithm
* (32 bit version); reference: http://code.google.com/p/smhasher
*
* This is the hash used by collections and case classes (including
* tuples).
*
* @author Rex Kerr
* @version 2.9
* @since 2.9
*/
import java.lang.Integer.{ rotateLeft rotl }
/**
* A class designed to generate well-distributed non-cryptographic
* hashes. It is designed to be passed to a collection's foreach method,
* or can take individual hash values with append. Its own hash code is
* set equal to the hash code of whatever it is hashing.
*/
class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T Unit) {
import MurmurHash._
private var h = startHash(seed)
private var c = hiddenMagicA
private var k = hiddenMagicB
private var hashed = false
private var hashvalue = h
/** Begin a new hash using the same seed. */
def reset(): Unit = {
h = startHash(seed)
c = hiddenMagicA
k = hiddenMagicB
hashed = false
}
/** Incorporate the hash value of one item. */
def apply(t: T): Unit = {
h = extendHash(h, t.##, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
hashed = false
}
/** Incorporate a known hash value. */
def append(i: Int): Unit = {
h = extendHash(h, i, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
hashed = false
}
/** Retrieve the hash value */
def hash: Int = {
if (!hashed) {
hashvalue = finalizeHash(h)
hashed = true
}
hashvalue
}
override def hashCode: Int = hash
}
/**
* An object designed to generate well-distributed non-cryptographic
* hashes. It is designed to hash a collection of integers; along with
* the integers to hash, it generates two magic streams of integers to
* increase the distribution of repetitive input sequences. Thus,
* three methods need to be called at each step (to start and to
* incorporate a new integer) to update the values. Only one method
* needs to be called to finalize the hash.
*/
object MurmurHash {
// Magic values used for MurmurHash's 32 bit hash.
// Don't change these without consulting a hashing expert!
final private val visibleMagic: Int = 0x971e137b
final private val hiddenMagicA: Int = 0x95543787
final private val hiddenMagicB: Int = 0x2ad7eb25
final private val visibleMixer: Int = 0x52dce729
final private val hiddenMixerA: Int = 0x7b7d159c
final private val hiddenMixerB: Int = 0x6bce6396
final private val finalMixer1: Int = 0x85ebca6b
final private val finalMixer2: Int = 0xc2b2ae35
// Arbitrary values used for hashing certain classes
final private val seedString: Int = 0xf7ca7fd2
final private val seedArray: Int = 0x3c074a61
/** The first 23 magic integers from the first stream are stored here */
val storedMagicA: Array[Int] =
Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray
/** The first 23 magic integers from the second stream are stored here */
val storedMagicB: Array[Int] =
Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray
/** Begin a new hash with a seed value. */
def startHash(seed: Int): Int = seed ^ visibleMagic
/** The initial magic integers in the first stream. */
def startMagicA: Int = hiddenMagicA
/** The initial magic integer in the second stream. */
def startMagicB: Int = hiddenMagicB
/**
* Incorporates a new value into an existing hash.
*
* @param hash the prior hash value
* @param value the new value to incorporate
* @param magicA a magic integer from the stream
* @param magicB a magic integer from a different stream
* @return the updated hash value
*/
def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int =
(hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer
/** Given a magic integer from the first stream, compute the next */
def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA
/** Given a magic integer from the second stream, compute the next */
def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB
/** Once all hashes have been incorporated, this performs a final mixing */
def finalizeHash(hash: Int): Int = {
var i = (hash ^ (hash >>> 16))
i *= finalMixer1
i ^= (i >>> 13)
i *= finalMixer2
i ^= (i >>> 16)
i
}
/** Compute a high-quality hash of an array */
def arrayHash[@specialized T](a: Array[T]): Int = {
var h = startHash(a.length * seedArray)
var c = hiddenMagicA
var k = hiddenMagicB
var j = 0
while (j < a.length) {
h = extendHash(h, a(j).##, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
j += 1
}
finalizeHash(h)
}
/** Compute a high-quality hash of a string */
def stringHash(s: String): Int = {
var h = startHash(s.length * seedString)
var c = hiddenMagicA
var k = hiddenMagicB
var j = 0
while (j + 1 < s.length) {
val i = (s.charAt(j) << 16) + s.charAt(j + 1);
h = extendHash(h, i, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
j += 2
}
if (j < s.length) h = extendHash(h, s.charAt(j), c, k)
finalizeHash(h)
}
/**
* Compute a hash that is symmetric in its arguments--that is,
* where the order of appearance of elements does not matter.
* This is useful for hashing sets, for example.
* Get the node responsible for the data key.
* Can only be used if nodes exists in the node ring,
* otherwise throws `IllegalStateException`
*/
def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = {
var a, b, n = 0
var c = 1
xs.foreach(i {
val h = i.##
a += h
b ^= h
if (h != 0) c *= h
n += 1
})
var h = startHash(seed * n)
h = extendHash(h, a, storedMagicA(0), storedMagicB(0))
h = extendHash(h, b, storedMagicA(1), storedMagicB(1))
h = extendHash(h, c, storedMagicA(2), storedMagicB(2))
finalizeHash(h)
def nodeFor(key: Array[Byte]): T = {
if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key)
nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key))))
}
/**
* Get the node responsible for the data key.
* Can only be used if nodes exists in the node ring,
* otherwise throws `IllegalStateException`
*/
def nodeFor(key: String): T = {
if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key)
nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key))))
}
/**
* Is the node ring empty, i.e. no nodes added or all removed.
*/
def isEmpty: Boolean = nodes.isEmpty
}
object ConsistentHash {
def apply[T: ClassTag](nodes: Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = {
new ConsistentHash(SortedMap.empty[Int, T] ++
(for (node nodes; vnode 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)),
virtualNodesFactor)
}
/**
* Factory method to create a ConsistentHash
* JAVA API
*/
def create[T](nodes: java.lang.Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = {
import scala.collection.JavaConverters._
apply(nodes.asScala, virtualNodesFactor)(ClassTag(classOf[Any].asInstanceOf[Class[T]]))
}
private def nodeHashFor(node: Any, vnode: Int): Int =
hashFor((node + ":" + vnode).getBytes("UTF-8"))
private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes)
private def hashFor(string: String): Int = MurmurHash.stringHash(string)
}

View file

@ -0,0 +1,297 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import scala.collection.JavaConversions.iterableAsScalaIterable
import scala.util.control.NonFatal
import akka.actor.ActorRef
import akka.actor.SupervisorStrategy
import akka.actor.Props
import akka.dispatch.Dispatchers
import akka.event.Logging
import akka.serialization.SerializationExtension
import java.util.concurrent.atomic.AtomicReference
object ConsistentHashingRouter {
/**
* Creates a new ConsistentHashingRouter, routing to the specified routees
*/
def apply(routees: Iterable[ActorRef]): ConsistentHashingRouter =
new ConsistentHashingRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = {
import scala.collection.JavaConverters._
apply(routees.asScala)
}
/**
* If you don't define the `hashMapping` when
* constructing the [[akka.routing.ConsistentHashingRouter]]
* the messages need to implement this interface to define what
* data to use for the consistent hash key. Note that it's not
* the hash, but the data to be hashed.
*
* If returning an `Array[Byte]` or String it will be used as is,
* otherwise the configured [[akka.akka.serialization.Serializer]]
* will be applied to the returned data.
*
* If messages can't implement this interface themselves,
* it's possible to wrap the messages in
* [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]],
* or use [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]]
*/
trait ConsistentHashable {
def consistentHashKey: Any
}
/**
* If you don't define the `hashMapping` when
* constructing the [[akka.routing.ConsistentHashingRouter]]
* and messages can't implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]]
* themselves they can we wrapped by this envelope instead. The
* router will only send the wrapped message to the destination,
* i.e. the envelope will be stripped off.
*/
@SerialVersionUID(1L)
final case class ConsistentHashableEnvelope(message: Any, hashKey: Any)
extends ConsistentHashable with RouterEnvelope {
override def consistentHashKey: Any = hashKey
}
/**
* Partial function from message to the data to
* use for the consistent hash key. Note that it's not
* the hash that is to be returned, but the data to be hashed.
*
* If returning an `Array[Byte]` or String it will be used as is,
* otherwise the configured [[akka.akka.serialization.Serializer]]
* will be applied to the returned data.
*/
type ConsistentHashMapping = PartialFunction[Any, Any]
@SerialVersionUID(1L)
object emptyConsistentHashMapping extends ConsistentHashMapping {
def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashMapping apply()")
}
/**
* JAVA API
* Mapping from message to the data to use for the consistent hash key.
* Note that it's not the hash that is to be returned, but the data to be
* hashed.
*
* May return `null` to indicate that the message is not handled by
* this mapping.
*
* If returning an `Array[Byte]` or String it will be used as is,
* otherwise the configured [[akka.akka.serialization.Serializer]]
* will be applied to the returned data.
*/
trait ConsistentHashMapper {
def hashKey(message: Any): Any
}
}
/**
* A Router that uses consistent hashing to select a connection based on the
* sent message.
*
* There is 3 ways to define what data to use for the consistent hash key.
*
* 1. You can define `hashMapping` / `withHashMapper`
* of the router to map incoming messages to their consistent hash key.
* This makes the decision transparent for the sender.
*
* 2. The messages may implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]].
* The key is part of the message and it's convenient to define it together
* with the message definition.
*
* 3. The messages can be be wrapped in a [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]]
* to define what data to use for the consistent hash key. The sender knows
* the key to use.
*
* These ways to define the consistent hash key can be use together and at
* the same time for one router. The `hashMapping` is tried first.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* The router creates a head actor which supervises and/or monitors the
* routees. Instances are created as children of this actor, hence the
* children are not supervised by the parent of the router. Common choices are
* to always escalate (meaning that fault handling is always applied to all
* children simultaneously; this is the default) or use the parents strategy,
* which will result in routed children being treated individually, but it is
* possible as well to use Routers to give different supervisor strategies to
* different groups of children.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
* @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]]
* @param hashMapping partial function from message to the data to
* use for the consistent hash key
*/
@SerialVersionUID(1L)
case class ConsistentHashingRouter(
nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy,
val virtualNodesFactor: Int = 0,
val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping)
extends RouterConfig with ConsistentHashingLike {
/**
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
* Java API
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths))
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): ConsistentHashingRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the number of virtual nodes per node, used in [[akka.routing.ConsistantHash]]
*/
def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes)
/**
* Java API for setting the mapping from message to the data to use for the consistent hash key.
*/
def withHashMapper(mapping: ConsistentHashingRouter.ConsistentHashMapper) = {
copy(hashMapping = {
case message if (mapping.hashKey(message).asInstanceOf[AnyRef] ne null)
mapping.hashKey(message)
})
}
/**
* Uses the resizer of the given RouterConfig if this RouterConfig
* doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
* Uses the the `hashMapping` defined in code, since
* that can't be defined in configuration.
*/
override def withFallback(other: RouterConfig): RouterConfig = other match {
case _: FromConfig this
case otherRouter: ConsistentHashingRouter
val useResizer =
if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer
else this.resizer
copy(resizer = useResizer, hashMapping = otherRouter.hashMapping)
case _ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other))
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait ConsistentHashingLike { this: RouterConfig
import ConsistentHashingRouter._
def nrOfInstances: Int
def routees: Iterable[String]
def virtualNodesFactor: Int
def hashMapping: ConsistentHashMapping
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
else routeeProvider.registerRouteesFor(routees)
}
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
val vnodes =
if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor
else virtualNodesFactor
// tuple of routees and the ConsistentHash, updated together in updateConsistentHash
val consistentHashRef = new AtomicReference[(IndexedSeq[ActorRef], ConsistentHash[ActorRef])]((null, null))
updateConsistentHash()
// update consistentHash when routees has changed
// changes to routees are rare and when no changes this is a quick operation
def updateConsistentHash(): ConsistentHash[ActorRef] = {
val oldConsistentHashTuple = consistentHashRef.get
val (oldConsistentHashRoutees, oldConsistentHash) = oldConsistentHashTuple
val currentRoutees = routeeProvider.routees
if (currentRoutees ne oldConsistentHashRoutees) {
// when other instance, same content, no need to re-hash, but try to set routees
val consistentHash =
if (currentRoutees == oldConsistentHashRoutees) oldConsistentHash
else ConsistentHash(currentRoutees, vnodes) // re-hash
// ignore, don't update, in case of CAS failure
consistentHashRef.compareAndSet(oldConsistentHashTuple, (currentRoutees, consistentHash))
consistentHash
} else oldConsistentHash
}
def target(hashData: Any): ActorRef = try {
val currentConsistenHash = updateConsistentHash()
if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters
else hashData match {
case bytes: Array[Byte] currentConsistenHash.nodeFor(bytes)
case str: String currentConsistenHash.nodeFor(str)
case x: AnyRef currentConsistenHash.nodeFor(SerializationExtension(routeeProvider.context.system).serialize(x).get)
}
} catch {
case NonFatal(e)
// serialization failed
log.warning("Couldn't route message with consistent hash key [{}] due to [{}]", hashData, e.getMessage)
routeeProvider.context.system.deadLetters
}
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case _ if hashMapping.isDefinedAt(message)
List(Destination(sender, target(hashMapping(message))))
case hashable: ConsistentHashable List(Destination(sender, target(hashable.consistentHashKey)))
case other
log.warning("Message [{}] must be handled by hashMapping, or implement [{}] or be wrapped in [{}]",
message.getClass.getName, classOf[ConsistentHashable].getName,
classOf[ConsistentHashableEnvelope].getName)
List(Destination(sender, routeeProvider.context.system.deadLetters))
}
}
}
}

View file

@ -0,0 +1,149 @@
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
/**
* An implementation of Austin Appleby's MurmurHash 3.0 algorithm
* (32 bit version); reference: http://code.google.com/p/smhasher
*
* This is the hash used by collections and case classes (including
* tuples).
*
* @author Rex Kerr
* @version 2.9
* @since 2.9
*/
package akka.routing
import java.lang.Integer.{ rotateLeft rotl }
/**
* An object designed to generate well-distributed non-cryptographic
* hashes. It is designed to hash a collection of integers; along with
* the integers to hash, it generates two magic streams of integers to
* increase the distribution of repetitive input sequences. Thus,
* three methods need to be called at each step (to start and to
* incorporate a new integer) to update the values. Only one method
* needs to be called to finalize the hash.
*/
object MurmurHash {
// Magic values used for MurmurHash's 32 bit hash.
// Don't change these without consulting a hashing expert!
final private val visibleMagic: Int = 0x971e137b
final private val hiddenMagicA: Int = 0x95543787
final private val hiddenMagicB: Int = 0x2ad7eb25
final private val visibleMixer: Int = 0x52dce729
final private val hiddenMixerA: Int = 0x7b7d159c
final private val hiddenMixerB: Int = 0x6bce6396
final private val finalMixer1: Int = 0x85ebca6b
final private val finalMixer2: Int = 0xc2b2ae35
// Arbitrary values used for hashing certain classes
final private val seedString: Int = 0xf7ca7fd2
final private val seedArray: Int = 0x3c074a61
/** The first 23 magic integers from the first stream are stored here */
private val storedMagicA: Array[Int] =
Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray
/** The first 23 magic integers from the second stream are stored here */
private val storedMagicB: Array[Int] =
Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray
/** Begin a new hash with a seed value. */
def startHash(seed: Int): Int = seed ^ visibleMagic
/** The initial magic integers in the first stream. */
def startMagicA: Int = hiddenMagicA
/** The initial magic integer in the second stream. */
def startMagicB: Int = hiddenMagicB
/**
* Incorporates a new value into an existing hash.
*
* @param hash the prior hash value
* @param value the new value to incorporate
* @param magicA a magic integer from the stream
* @param magicB a magic integer from a different stream
* @return the updated hash value
*/
def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int =
(hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer
/** Given a magic integer from the first stream, compute the next */
def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA
/** Given a magic integer from the second stream, compute the next */
def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB
/** Once all hashes have been incorporated, this performs a final mixing */
def finalizeHash(hash: Int): Int = {
var i = (hash ^ (hash >>> 16))
i *= finalMixer1
i ^= (i >>> 13)
i *= finalMixer2
i ^= (i >>> 16)
i
}
/** Compute a high-quality hash of an array */
def arrayHash[@specialized T](a: Array[T]): Int = {
var h = startHash(a.length * seedArray)
var c = hiddenMagicA
var k = hiddenMagicB
var j = 0
while (j < a.length) {
h = extendHash(h, a(j).##, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
j += 1
}
finalizeHash(h)
}
/** Compute a high-quality hash of a string */
def stringHash(s: String): Int = {
var h = startHash(s.length * seedString)
var c = hiddenMagicA
var k = hiddenMagicB
var j = 0
while (j + 1 < s.length) {
val i = (s.charAt(j) << 16) + s.charAt(j + 1);
h = extendHash(h, i, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
j += 2
}
if (j < s.length) h = extendHash(h, s.charAt(j), c, k)
finalizeHash(h)
}
/**
* Compute a hash that is symmetric in its arguments--that is,
* where the order of appearance of elements does not matter.
* This is useful for hashing sets, for example.
*/
def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = {
var a, b, n = 0
var c = 1
xs.foreach(i {
val h = i.##
a += h
b ^= h
if (h != 0) c *= h
n += 1
})
var h = startHash(seed * n)
h = extendHash(h, a, storedMagicA(0), storedMagicB(0))
h = extendHash(h, b, storedMagicA(1), storedMagicB(1))
h = extendHash(h, c, storedMagicA(2), storedMagicB(2))
finalizeHash(h)
}
}

View file

@ -5,7 +5,6 @@ package akka.routing
import language.implicitConversions
import language.postfixOps
import akka.actor._
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
@ -19,6 +18,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.dispatch.Dispatchers
import scala.annotation.tailrec
import concurrent.ExecutionContext
import scala.concurrent.util.FiniteDuration
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -115,8 +115,8 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
val s = if (sender eq null) system.deadLetters else sender
val msg = message match {
case Broadcast(m) m
case m m
case wrapped: RouterEnvelope wrapped.message
case m m
}
applyRoute(s, message) match {
@ -283,7 +283,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
* The reason for the delay is to give concurrent messages a chance to be
* placed in mailbox before sending PoisonPill.
*/
def removeRoutees(nrOfInstances: Int, stopDelay: Duration): Unit = {
def removeRoutees(nrOfInstances: Int, stopDelay: FiniteDuration): Unit = {
if (nrOfInstances <= 0) {
throw new IllegalArgumentException("Expected positive nrOfInstances, got [%s]".format(nrOfInstances))
} else if (nrOfInstances > 0) {
@ -298,7 +298,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi
* Give concurrent messages a chance to be placed in mailbox before
* sending PoisonPill.
*/
protected def delayedStop(scheduler: Scheduler, abandon: Iterable[ActorRef], stopDelay: Duration): Unit = {
protected def delayedStop(scheduler: Scheduler, abandon: Iterable[ActorRef], stopDelay: FiniteDuration): Unit = {
if (abandon.nonEmpty) {
if (stopDelay <= Duration.Zero) {
abandon foreach (_ ! PoisonPill)
@ -400,7 +400,15 @@ private object Router {
* Router implementations may choose to handle this message differently.
*/
@SerialVersionUID(1L)
case class Broadcast(message: Any)
case class Broadcast(message: Any) extends RouterEnvelope
/**
* Only the contained message will be forwarded to the
* destination, i.e. the envelope will be stripped off.
*/
trait RouterEnvelope {
def message: Any
}
/**
* Sending this message to a router will make it send back its currently used routees.
@ -588,6 +596,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait RoundRobinLike { this: RouterConfig
def nrOfInstances: Int
@ -721,6 +733,10 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait RandomLike { this: RouterConfig
def nrOfInstances: Int
@ -861,6 +877,10 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait SmallestMailboxLike { this: RouterConfig
def nrOfInstances: Int
@ -1076,6 +1096,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait BroadcastLike { this: RouterConfig
def nrOfInstances: Int
@ -1098,13 +1122,13 @@ object ScatterGatherFirstCompletedRouter {
/**
* Creates a new ScatterGatherFirstCompletedRouter, routing to the specified routees, timing out after the specified Duration
*/
def apply(routees: Iterable[ActorRef], within: Duration): ScatterGatherFirstCompletedRouter =
def apply(routees: Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter =
new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within)
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef], within: Duration): ScatterGatherFirstCompletedRouter = {
def create(routees: java.lang.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter = {
import scala.collection.JavaConverters._
apply(routees.asScala, within)
}
@ -1153,7 +1177,7 @@ object ScatterGatherFirstCompletedRouter {
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: FiniteDuration,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
@ -1166,7 +1190,7 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int, w: Duration) = this(nrOfInstances = nr, within = w)
def this(nr: Int, w: FiniteDuration) = this(nrOfInstances = nr, within = w)
/**
* Constructor that sets the routees to be used.
@ -1174,14 +1198,14 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String], w: Duration) =
def this(routeePaths: java.lang.Iterable[String], w: FiniteDuration) =
this(routees = iterableAsScalaIterable(routeePaths), within = w)
/**
* Constructor that sets the resizer to be used.
* Java API
*/
def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w)
def this(resizer: Resizer, w: FiniteDuration) = this(resizer = Some(resizer), within = w)
/**
* Java API for setting routerDispatcher
@ -1205,13 +1229,17 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait ScatterGatherFirstCompletedLike { this: RouterConfig
def nrOfInstances: Int
def routees: Iterable[String]
def within: Duration
def within: FiniteDuration
def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
@ -1332,7 +1360,7 @@ case class DefaultResizer(
* messages a chance to be placed in mailbox before sending PoisonPill.
* Use 0 seconds to skip delay.
*/
stopDelay: Duration = 1.second,
stopDelay: FiniteDuration = 1.second,
/**
* Number of messages between resize operation.
* Use 1 to resize before each message.

View file

@ -9,7 +9,6 @@ import com.typesafe.config.Config
import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess }
import akka.event.Logging
import java.util.concurrent.ConcurrentHashMap
import scala.util.control.NonFatal
import scala.collection.mutable.ArrayBuffer
import java.io.NotSerializableException
import util.{ Try, DynamicVariable }
@ -100,8 +99,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
// bindings are ordered from most specific to least specific
def unique(possibilities: Seq[(Class[_], Serializer)]): Boolean =
possibilities.size == 1 ||
(possibilities map (_._1) forall (_ isAssignableFrom possibilities(0)._1)) ||
(possibilities map (_._2) forall (_ == possibilities(0)._2))
(possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) ||
(possibilities forall (_._2 == possibilities(0)._2))
val ser = bindings filter { _._1 isAssignableFrom clazz } match {
case Seq()

View file

@ -8,10 +8,10 @@ import language.implicitConversions
import java.util.concurrent.TimeUnit
import java.lang.{ Double JDouble }
import scala.concurrent.util.Duration
import scala.concurrent.util.{ Duration, FiniteDuration }
@SerialVersionUID(1L)
case class Timeout(duration: Duration) {
case class Timeout(duration: FiniteDuration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
@ -26,17 +26,10 @@ object Timeout {
*/
val zero: Timeout = new Timeout(Duration.Zero)
/**
* A Timeout with infinite duration. Will never timeout. Use extreme caution with this
* as it may cause memory leaks, blocked threads, or may not even be supported by
* the receiver, which would result in an exception.
*/
val never: Timeout = new Timeout(Duration.Inf)
def apply(timeout: Long): Timeout = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit): Timeout = new Timeout(length, unit)
implicit def durationToTimeout(duration: Duration): Timeout = new Timeout(duration)
implicit def durationToTimeout(duration: FiniteDuration): Timeout = new Timeout(duration)
implicit def intToTimeout(timeout: Int): Timeout = new Timeout(timeout)
implicit def longToTimeout(timeout: Long): Timeout = new Timeout(timeout)
}

View file

@ -11,6 +11,7 @@ import akka.util.Timeout
import scala.concurrent.stm._
import concurrent.{ ExecutionContext, Future, Promise, Await }
import concurrent.util.Duration
import scala.concurrent.util.FiniteDuration
/**
* Used internally to send functions.
@ -240,7 +241,7 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
* Dispatch a function to update the internal state, and return a Future where that new state can be obtained
* within the given timeout
*/
def alter(f: JFunc[T, T], timeout: Duration): Future[T] = alter(x f(x))(timeout)
def alter(f: JFunc[T, T], timeout: FiniteDuration): Future[T] = alter(x f(x))(timeout)
/**
* Java API:
@ -259,7 +260,7 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
*/
def alterOff(f: JFunc[T, T], timeout: Duration, ec: ExecutionContext): Unit = alterOff(x f(x))(Timeout(timeout), ec)
def alterOff(f: JFunc[T, T], timeout: FiniteDuration, ec: ExecutionContext): Unit = alterOff(x f(x))(Timeout(timeout), ec)
/**
* Java API:

View file

@ -10,6 +10,7 @@ import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.pattern._
import scala.concurrent.util.Duration
import concurrent.{ ExecutionContext, Future }
import scala.concurrent.util.FiniteDuration
/**
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
@ -27,7 +28,7 @@ trait Activation {
* @param endpoint the endpoint to be activated
* @param timeout the timeout for the Future
*/
def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] =
def activationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointActivated(`endpoint`) endpoint
case EndpointFailedToActivate(`endpoint`, cause) throw cause
@ -40,7 +41,7 @@ trait Activation {
* @param endpoint the endpoint to be deactivated
* @param timeout the timeout of the Future
*/
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] =
def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] =
(activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({
case EndpointDeActivated(`endpoint`) endpoint
case EndpointFailedToDeActivate(`endpoint`, cause) throw cause

View file

@ -5,12 +5,12 @@
package akka.camel
import language.postfixOps
import internal.component.DurationTypeConverter
import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition }
import akka.actor._
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import scala.concurrent.util.FiniteDuration
/**
* Mixed in by Actor implementations that consume message from Camel endpoints.
@ -41,7 +41,7 @@ trait ConsumerConfig { this: CamelSupport ⇒
/**
* How long the actor should wait for activation before it fails.
*/
def activationTimeout: Duration = camel.settings.activationTimeout
def activationTimeout: FiniteDuration = camel.settings.activationTimeout
/**
* When endpoint is out-capable (can produce responses) replyTimeout is the maximum time

View file

@ -7,9 +7,7 @@ package akka.camel.internal
import akka.camel._
import component.CamelPath
import java.io.InputStream
import org.apache.camel.builder.RouteBuilder
import akka.actor._
import collection.mutable
import org.apache.camel.model.RouteDefinition
@ -17,6 +15,7 @@ import org.apache.camel.CamelContext
import scala.concurrent.util.Duration
import concurrent.Await
import akka.util.Timeout
import scala.concurrent.util.FiniteDuration
/**
* For internal use only.
@ -38,7 +37,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒
* @param activationTimeout the timeout for activation
* @return the actorRef to the consumer
*/
private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = {
private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: FiniteDuration) = {
idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer)
Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout)
}

View file

@ -8,8 +8,8 @@ import akka.event.Logging
import akka.camel.{ CamelSettings, Camel }
import scala.util.control.NonFatal
import scala.concurrent.util.Duration
import org.apache.camel.{ ProducerTemplate, CamelContext }
import scala.concurrent.util.FiniteDuration
/**
* For internal use only.
@ -32,7 +32,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel {
ctx.setName(system.name)
ctx.setStreamCaching(true)
ctx.addComponent("akka", new ActorComponent(this, system))
ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter)
ctx.getTypeConverterRegistry.addTypeConverter(classOf[FiniteDuration], classOf[String], DurationTypeConverter)
ctx
}

View file

@ -5,15 +5,11 @@
package akka.camel.internal.component
import language.postfixOps
import java.util.{ Map JMap }
import org.apache.camel._
import org.apache.camel.impl.{ DefaultProducer, DefaultEndpoint, DefaultComponent }
import akka.actor._
import akka.pattern._
import scala.reflect.BeanProperty
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
@ -25,6 +21,7 @@ import akka.camel.internal.CamelExchangeAdapter
import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage }
import support.TypeConverterSupport
import scala.util.{ Failure, Success, Try }
import scala.concurrent.util.FiniteDuration
/**
* For internal use only.
@ -98,7 +95,7 @@ private[camel] trait ActorEndpointConfig {
def path: ActorEndpointPath
def camel: Camel
@BeanProperty var replyTimeout: Duration = camel.settings.replyTimeout
@BeanProperty var replyTimeout: FiniteDuration = camel.settings.replyTimeout
@BeanProperty var autoAck: Boolean = camel.settings.autoAck
}

View file

@ -4,20 +4,22 @@
package akka.camel;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration;
import org.junit.AfterClass;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import scala.concurrent.util.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit.EventFilter;
import static org.junit.Assert.assertEquals;
import akka.testkit.JavaTestKit;
/**
@ -37,7 +39,7 @@ public class ConsumerJavaTestBase {
new JavaTestKit(system) {{
String result = new EventFilter<String>(Exception.class) {
protected String run() {
Duration timeout = Duration.create(1, TimeUnit.SECONDS);
FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS);
Camel camel = CamelExtension.get(system);
ExecutionContext executionContext = system.dispatcher();
try {

View file

@ -7,6 +7,7 @@ import akka.camel.javaapi.UntypedProducerActor;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
@ -59,7 +60,7 @@ public class CustomRouteTestBase {
@Test
public void testCustomConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class);
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext),
@ -73,7 +74,7 @@ public class CustomRouteTestBase {
@Test
public void testCustomAckConsumerRoute() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class);
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(
@ -91,7 +92,7 @@ public class CustomRouteTestBase {
public void testCustomAckConsumerRouteFromUri() throws Exception {
MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class);
ExecutionContext executionContext = system.dispatcher();
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"),
timeout, executionContext),
@ -104,7 +105,7 @@ public class CustomRouteTestBase {
@Test(expected=CamelExecutionException.class)
public void testCustomTimeoutConsumerRoute() throws Exception {
Duration timeout = Duration.create(10, TimeUnit.SECONDS);
FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS);
ExecutionContext executionContext = system.dispatcher();
ActorRef consumer = Await.result(
camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"),

View file

@ -1,7 +1,6 @@
package akka.camel.internal
import language.postfixOps
import org.scalatest.matchers.MustMatchers
import scala.concurrent.util.duration._
import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
@ -9,6 +8,7 @@ import akka.actor.{ Props, ActorSystem }
import scala.concurrent.util.Duration
import akka.camel._
import akka.testkit.{ TimingTest, TestProbe, TestKit }
import scala.concurrent.util.FiniteDuration
class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen {
@ -115,11 +115,11 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w
val probe = TestProbe()
def awaitActivation() = at.tell(AwaitActivation(actor.ref), probe.ref)
def awaitDeActivation() = at.tell(AwaitDeActivation(actor.ref), probe.ref)
def verifyActivated(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointActivated(actor.ref)) }
def verifyDeActivated(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointDeActivated(actor.ref)) }
def verifyActivated(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointActivated(actor.ref)) }
def verifyDeActivated(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointDeActivated(actor.ref)) }
def verifyFailedToActivate(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToActivate(actor.ref, cause)) }
def verifyFailedToDeActivate(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToDeActivate(actor.ref, cause)) }
def verifyFailedToActivate(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToActivate(actor.ref, cause)) }
def verifyFailedToDeActivate(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToDeActivate(actor.ref, cause)) }
}

View file

@ -5,7 +5,6 @@
package akka.camel.internal.component
import language.postfixOps
import org.scalatest.mock.MockitoSugar
import org.mockito.Matchers.any
import org.mockito.Mockito._
@ -27,6 +26,7 @@ import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem.Settings
import akka.event.LoggingAdapter
import akka.testkit.{ TimingTest, TestKit, TestProbe }
import scala.concurrent.util.FiniteDuration
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture {
@ -303,7 +303,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
def msg(s: String) = CamelMessage(s, Map.empty)
def given(actor: ActorRef = probe.ref, outCapable: Boolean = true, autoAck: Boolean = true, replyTimeout: Duration = Int.MaxValue seconds) = {
def given(actor: ActorRef = probe.ref, outCapable: Boolean = true, autoAck: Boolean = true, replyTimeout: FiniteDuration = Int.MaxValue seconds) = {
prepareMocks(actor, outCapable = outCapable)
new ActorProducer(configure(isAutoAck = autoAck, _replyTimeout = replyTimeout), camel)
}
@ -325,16 +325,16 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
callbackReceived.countDown()
}
private[this] def valueWithin(implicit timeout: Duration) =
private[this] def valueWithin(implicit timeout: FiniteDuration) =
if (!callbackReceived.await(timeout.toNanos, TimeUnit.NANOSECONDS)) fail("Callback not received!")
else callbackValue.get
def expectDoneSyncWithin(implicit timeout: Duration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously")
def expectDoneAsyncWithin(implicit timeout: Duration): Unit = if (valueWithin(timeout)) fail("Expected to be done Asynchronously")
def expectDoneSyncWithin(implicit timeout: FiniteDuration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously")
def expectDoneAsyncWithin(implicit timeout: FiniteDuration): Unit = if (valueWithin(timeout)) fail("Expected to be done Asynchronously")
}
def configure(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = {
def configure(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: FiniteDuration = Int.MaxValue seconds) = {
val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel)
endpoint.autoAck = isAutoAck
endpoint.replyTimeout = _replyTimeout

View file

@ -5,7 +5,6 @@
package akka.cluster
import language.implicitConversions
import akka.actor._
import akka.actor.Status._
import akka.ConfigurationException
@ -20,13 +19,12 @@ import scala.concurrent.util.{ Duration, Deadline }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.annotation.tailrec
import scala.collection.immutable.SortedSet
import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import akka.util.internal.HashedWheelTimer
import concurrent.{ ExecutionContext, Await }
import scala.concurrent.util.FiniteDuration
/**
* Cluster Extension Id and factory for creating Cluster extension.
@ -111,26 +109,26 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
new Scheduler with Closeable {
override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing
override def schedule(initialDelay: Duration, frequency: Duration,
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, frequency, receiver, message)
systemScheduler.schedule(initialDelay, interval, receiver, message)
override def schedule(initialDelay: Duration, frequency: Duration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, frequency)(f)
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, interval)(f)
override def schedule(initialDelay: Duration, frequency: Duration,
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, frequency, runnable)
systemScheduler.schedule(initialDelay, interval, runnable)
override def scheduleOnce(delay: Duration,
override def scheduleOnce(delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, runnable)
override def scheduleOnce(delay: Duration, receiver: ActorRef,
override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, receiver, message)
override def scheduleOnce(delay: Duration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
override def scheduleOnce(delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay)(f)
}
}
@ -192,6 +190,24 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
def unsubscribe(subscriber: ActorRef): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber)
/**
* Publish current (full) state of the cluster to subscribers,
* that are subscribing to [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or [[akka.cluster.ClusterEvent.CurrentClusterState]].
* If you want this to happen periodically you need to schedule a call to
* this method yourself.
*/
def publishCurrentClusterState(): Unit =
clusterCore ! InternalClusterAction.PublishCurrentClusterState(None)
/**
* Publish current (full) state of the cluster to the specified
* receiver. If you want this to happen periodically you need to schedule
* a call to this method yourself.
*/
def sendCurrentClusterState(receiver: ActorRef): Unit =
clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver))
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.

View file

@ -15,7 +15,7 @@ import akka.cluster.routing.ClusterRouterConfig
import akka.event.EventStream
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteDeployer
import akka.routing.RemoteRouterConfig
import akka.remote.routing.RemoteRouterConfig
import akka.cluster.routing.ClusterRouterSettings
class ClusterActorRefProvider(

View file

@ -16,6 +16,7 @@ import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import language.existentials
import language.postfixOps
import scala.concurrent.util.FiniteDuration
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -106,13 +107,15 @@ private[cluster] object InternalClusterAction {
sealed trait SubscriptionMessage
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage
case class Unsubscribe(subscriber: ActorRef) extends SubscriptionMessage
/**
* @param receiver if `receiver` is defined the event will only be sent to that
* actor, otherwise it will be sent to all subscribers via the `eventStream`.
*/
case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage
case class PublishChanges(oldGossip: Gossip, newGossip: Gossip)
case object PublishDone
case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage
case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage
}
/**
@ -189,32 +192,32 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
// start periodic gossip to random nodes in cluster
val gossipTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], GossipInterval) {
self ! GossipTick
}
// start periodic heartbeat to all nodes in cluster
val heartbeatTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) {
self ! HeartbeatTick
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) {
self ! ReapUnreachableTick
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsTask =
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], LeaderActionsInterval) {
self ! LeaderActionsTick
}
// start periodic publish of current state
private val publishStateTask: Option[Cancellable] =
if (PublishStatsInterval == Duration.Zero) None
else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) {
else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) {
self ! PublishStatsTick
})
@ -255,7 +258,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
case Remove(address) removing(address)
case SendGossipTo(address) gossipTo(address)
case msg: SubscriptionMessage publisher forward msg
case p: Ping ping(p)
}
@ -831,7 +833,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
def publishInternalStats(): Unit = publisher ! CurrentInternalStats(stats)
def ping(p: Ping): Unit = sender ! Pong(p)
}
/**

View file

@ -185,6 +185,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
def receive = {
case PublishChanges(oldGossip, newGossip) publishChanges(oldGossip, newGossip)
case currentStats: CurrentInternalStats publishInternalStats(currentStats)
case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver)
case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber) unsubscribe(subscriber)
case PublishDone sender ! PublishDone
@ -192,13 +193,21 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
def eventStream: EventStream = context.system.eventStream
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
subscriber ! CurrentClusterState(
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
val state = CurrentClusterState(
members = latestGossip.members,
unreachable = latestGossip.overview.unreachable,
convergence = latestGossip.convergence,
seenBy = latestGossip.seenBy,
leader = latestGossip.leader)
receiver match {
case Some(ref) ref ! state
case None eventStream publish state
}
}
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
publishCurrentClusterState(Some(subscriber))
eventStream.subscribe(subscriber, to)
}

View file

@ -9,14 +9,15 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.actor.{ Scheduler, Cancellable }
import scala.concurrent.util.Duration
import concurrent.ExecutionContext
import scala.concurrent.util.FiniteDuration
/**
* INTERNAL API
*/
private[akka] object FixedRateTask {
def apply(scheduler: Scheduler,
initalDelay: Duration,
delay: Duration)(f: Unit)(implicit executor: ExecutionContext): FixedRateTask =
initalDelay: FiniteDuration,
delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): FixedRateTask =
new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f })
}
@ -28,8 +29,8 @@ private[akka] object FixedRateTask {
* initialDelay.
*/
private[akka] class FixedRateTask(scheduler: Scheduler,
initalDelay: Duration,
delay: Duration,
initalDelay: FiniteDuration,
delay: FiniteDuration,
task: Runnable)(implicit executor: ExecutionContext)
extends Runnable with Cancellable {

View file

@ -28,7 +28,7 @@ import akka.routing.Route
import akka.routing.RouteeProvider
import akka.routing.Router
import akka.routing.RouterConfig
import akka.routing.RemoteRouterConfig
import akka.remote.routing.RemoteRouterConfig
import akka.actor.RootActorPath
import akka.actor.ActorCell
import akka.actor.RelativeActorPath

View file

@ -19,6 +19,7 @@ import akka.remote.testconductor.RoleName
import akka.actor.Props
import akka.actor.Actor
import akka.cluster.MemberStatus._
import scala.concurrent.util.FiniteDuration
object LargeClusterMultiJvmSpec extends MultiNodeConfig {
// each jvm simulates a datacenter with many nodes
@ -123,8 +124,9 @@ abstract class LargeClusterSpec
systems foreach { Cluster(_) }
}
def expectedMaxDuration(totalNodes: Int): Duration =
5.seconds + (2.seconds * totalNodes)
def expectedMaxDuration(totalNodes: Int): FiniteDuration =
// this cast will always succeed, but the compiler does not know about it ...
(5.seconds + (2.seconds * totalNodes)).asInstanceOf[FiniteDuration]
def joinAll(from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = {
val joiningClusters = systems.map(Cluster(_)).toSet
@ -272,7 +274,7 @@ abstract class LargeClusterSpec
val unreachableNodes = nodesPerDatacenter
val liveNodes = nodesPerDatacenter * 4
within(30.seconds + (3.seconds * liveNodes)) {
within((30.seconds + (3.seconds * liveNodes)).asInstanceOf[FiniteDuration]) {
val startGossipCounts = Map.empty[Cluster, Long] ++
systems.map(sys (Cluster(sys) -> Cluster(sys).readView.latestStats.receivedGossipCount))
def gossipCount(c: Cluster): Long = {

View file

@ -4,12 +4,11 @@
package akka.cluster
import language.implicitConversions
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.{ Address, ExtendedActorSystem }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.{STMultiNodeSpec, MultiNodeSpec}
import akka.testkit._
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
@ -18,6 +17,7 @@ import org.scalatest.exceptions.TestFailedException
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorPath
import akka.actor.RootActorPath
import scala.concurrent.util.FiniteDuration
object MultiNodeClusterSpec {
@ -48,7 +48,7 @@ object MultiNodeClusterSpec {
""")
}
trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeSpec
override def initialParticipants = roles.size
@ -175,7 +175,7 @@ trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec ⇒
def awaitUpConvergence(
numberOfMembers: Int,
canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address],
timeout: Duration = 20.seconds): Unit = {
timeout: FiniteDuration = 20.seconds): Unit = {
within(timeout) {
awaitCond(clusterView.members.size == numberOfMembers)
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))

View file

@ -67,21 +67,11 @@ abstract class TransitionSpec
memberStatus(address) == status
}
def leaderActions(): Unit = {
def leaderActions(): Unit =
cluster.clusterCore ! LeaderActionsTick
awaitPing()
}
def reapUnreachable(): Unit = {
def reapUnreachable(): Unit =
cluster.clusterCore ! ReapUnreachableTick
awaitPing()
}
def awaitPing(): Unit = {
val ping = Ping()
cluster.clusterCore ! ping
expectMsgPF() { case pong @ Pong(`ping`, _) pong }
}
// DSL sugar for `role1 gossipTo role2`
implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role)

View file

@ -6,10 +6,8 @@ package akka.cluster
import language.postfixOps
import language.reflectiveCalls
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.actor.ExtendedActorSystem
@ -17,6 +15,7 @@ import akka.actor.Address
import akka.cluster.InternalClusterAction._
import java.lang.management.ManagementFactory
import javax.management.ObjectName
import akka.actor.ActorRef
object ClusterSpec {
val config = """
@ -45,16 +44,8 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
val cluster = Cluster(system)
def clusterView = cluster.readView
def leaderActions(): Unit = {
def leaderActions(): Unit =
cluster.clusterCore ! LeaderActionsTick
awaitPing()
}
def awaitPing(): Unit = {
val ping = Ping()
cluster.clusterCore ! ping
expectMsgPF() { case pong @ Pong(`ping`, _) pong }
}
"A Cluster" must {
@ -79,7 +70,25 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
clusterView.status must be(MemberStatus.Joining)
clusterView.convergence must be(true)
leaderActions()
clusterView.status must be(MemberStatus.Up)
awaitCond(clusterView.status == MemberStatus.Up)
}
"publish CurrentClusterState to subscribers when requested" in {
try {
cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent])
// first, is in response to the subscription
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
cluster.publishCurrentClusterState()
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
} finally {
cluster.unsubscribe(testActor)
}
}
"send CurrentClusterState to one receiver when requested" in {
cluster.sendCurrentClusterState(testActor)
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
}
}

View file

@ -51,28 +51,7 @@ What do they do?
* Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen`
* These are executed in the :class:`ExecutionContext` provided.
.. graphviz::
digraph circuit_breaker {
rankdir = "LR";
size = "6,5";
graph [ bgcolor = "transparent" ]
node [ fontname = "Helvetica",
fontsize = 14,
shape = circle,
color = white,
style = filled ];
edge [ fontname = "Helvetica", fontsize = 12 ]
Closed [ fillcolor = green2 ];
"Half-Open" [fillcolor = yellow2 ];
Open [ fillcolor = red2 ];
Closed -> Closed [ label = "Success" ];
"Half-Open" -> Open [ label = "Trip Breaker" ];
"Half-Open" -> Closed [ label = "Reset Breaker" ];
Closed -> Open [ label = "Trip Breaker" ];
Open -> Open [ label = "Calls failing fast" ];
Open -> "Half-Open" [ label = "Attempt Reset" ];
}
.. image:: ../images/circuit-breaker-states.png
========
Examples

View file

@ -27,7 +27,7 @@ public class DangerousJavaActor extends UntypedActor {
public DangerousJavaActor() {
this.breaker = new CircuitBreaker(
getContext().dispatcher(), getContext().system().scheduler(),
5, Duration.parse("10s"), Duration.parse("1m"))
5, Duration.create(10, "s"), Duration.create(1, "m"))
.onOpen(new Callable<Object>() {
public Object call() throws Exception {
notifyMeOnOpen();

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.duration;
//#import
import scala.concurrent.util.Duration;
import scala.concurrent.util.Deadline;
//#import
class Java {
public void demo() {
//#dsl
final Duration fivesec = Duration.create(5, "seconds");
final Duration threemillis = Duration.parse("3 millis");
final Duration diff = fivesec.minus(threemillis);
assert diff.lt(fivesec);
assert Duration.Zero().lt(Duration.Inf());
//#dsl
//#deadline
final Deadline deadline = Duration.create(10, "seconds").fromNow();
final Duration rest = deadline.timeLeft();
//#deadline
}
}

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.duration
object Scala {
//#dsl
import scala.concurrent.util.duration._ // notice the small d
val fivesec = 5.seconds
val threemillis = 3.millis
val diff = fivesec - threemillis
assert(diff < fivesec)
val fourmillis = threemillis * 4 / 3 // though you cannot write it the other way around
val n = threemillis / (1 millisecond)
//#dsl
//#deadline
val deadline = 10.seconds.fromNow
// do something
val rest = deadline.timeLeft
//#deadline
}

View file

@ -5,9 +5,9 @@ Duration
########
Durations are used throughout the Akka library, wherefore this concept is
represented by a special data type, :class:`Duration`. Values of this type may
represent infinite (:obj:`Duration.Inf`, :obj:`Duration.MinusInf`) or finite
durations.
represented by a special data type, :class:`scala.concurrent.util.Duration`.
Values of this type may represent infinite (:obj:`Duration.Inf`,
:obj:`Duration.MinusInf`) or finite durations, or be :obj:`Duration.Undefined`.
Finite vs. Infinite
===================
@ -24,18 +24,10 @@ distinguishing the two kinds at compile time:
Scala
=====
In Scala durations are constructable using a mini-DSL and support all expected operations:
In Scala durations are constructable using a mini-DSL and support all expected
arithmetic operations:
.. code-block:: scala
import scala.concurrent.util.duration._ // notice the small d
val fivesec = 5.seconds
val threemillis = 3.millis
val diff = fivesec - threemillis
assert (diff < fivesec)
val fourmillis = threemillis * 4 / 3 // though you cannot write it the other way around
val n = threemillis / (1 millisecond)
.. includecode:: code/docs/duration/Sample.scala#dsl
.. note::
@ -50,26 +42,19 @@ Java
Java provides less syntactic sugar, so you have to spell out the operations as
method calls instead:
.. code-block:: java
final Duration fivesec = Duration.create(5, "seconds");
final Duration threemillis = Duration.parse("3 millis");
final Duration diff = fivesec.minus(threemillis);
assert (diff.lt(fivesec));
assert (Duration.Zero().lt(Duration.Inf()));
.. includecode:: code/docs/duration/Java.java#import
.. includecode:: code/docs/duration/Java.java#dsl
Deadline
========
Durations have a brother name :class:`Deadline`, which is a class holding a representation
Durations have a brother named :class:`Deadline`, which is a class holding a representation
of an absolute point in time, and support deriving a duration from this by calculating the
difference between now and the deadline. This is useful when you want to keep one overall
deadline without having to take care of the book-keeping wrt. the passing of time yourself::
deadline without having to take care of the book-keeping wrt. the passing of time yourself:
val deadline = 10 seconds fromNow
// do something which takes time
awaitCond(..., deadline.timeLeft)
.. includecode:: code/docs/duration/Sample.scala#deadline
In Java you create these from durations::
In Java you create these from durations:
final Deadline d = Duration.create(5, "seconds").fromNow();
.. includecode:: code/docs/duration/Java.java#deadline

View file

@ -8,7 +8,7 @@ import sys, os
# -- General configuration -----------------------------------------------------
sys.path.append(os.path.abspath('_sphinx/exts'))
extensions = ['sphinx.ext.todo', 'includecode', 'sphinx.ext.graphviz']
extensions = ['sphinx.ext.todo', 'includecode']
templates_path = ['_templates']
source_suffix = '.rst'

View file

@ -18,9 +18,7 @@ http://github.com/typesafehub/sbt-multi-jvm
You can add it as a plugin by adding the following to your project/plugins.sbt::
resolvers += Classpaths.typesafeResolver
addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M4")
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.0")
You can then add multi-JVM testing to ``project/Build.scala`` by including the ``MultiJvm``
settings and config. For example, here is an example of how the akka-remote-tests project adds
@ -30,8 +28,8 @@ multi-JVM testing (Simplified for clarity):
import sbt._
import Keys._
import com.typesafe.sbtmultijvm.MultiJvmPlugin
import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions }
import com.typesafe.sbt.SbtMultiJvm
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.{ MultiJvm, extraOptions }
object AkkaBuild extends Build {
@ -49,7 +47,7 @@ multi-JVM testing (Simplified for clarity):
)
) configs (MultiJvm)
lazy val buildSettings = Defaults.defaultSettings ++ Seq(
lazy val buildSettings = Defaults.defaultSettings ++ SbtMultiJvm.multiJvmSettings ++ Seq(
organization := "com.typesafe.akka",
version := "2.1-SNAPSHOT",
scalaVersion := "|scalaVersion|",
@ -78,14 +76,14 @@ the sbt prompt):
.. code-block:: none
akka-remote/multi-jvm:test
akka-remote-tests/multi-jvm:test
Or one can change to the ``akka-remote`` project first, and then run the
Or one can change to the ``akka-remote-tests`` project first, and then run the
tests:
.. code-block:: none
project akka-remote
project akka-remote-tests
multi-jvm:test
To run individual tests use ``test-only``:

View file

@ -210,7 +210,7 @@ Querying the Logical Actor Hierarchy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Since the actor system forms a file-system like hierarchy, matching on paths is
possible in the same was as supported by Unix shells: you may replace (parts
possible in the same way as supported by Unix shells: you may replace (parts
of) path element names with wildcards (`«*»` and `«?»`) to formulate a
selection which may match zero or more actual actors. Because the result is not
a single actor reference, it has a different type :class:`ActorSelection` and

View file

@ -45,8 +45,8 @@ To prevent visibility and reordering problems on actors, Akka guarantees the fol
.. note::
In layman's terms this means that changes to internal fields of the actor is visible when the next message
is processed by that actor. So fields in your actor does not need to be volatile or equivalent.
In layman's terms this means that changes to internal fields of the actor are visible when the next message
is processed by that actor. So fields in your actor need not be volatile or equivalent.
Both rules only apply for the same actor instance and are not valid if different actors are used.

View file

@ -0,0 +1,19 @@
digraph circuit_breaker {
rankdir = "LR";
size = "6,5";
graph [ bgcolor = "transparent" ]
node [ fontname = "Helvetica",
fontsize = 14,
shape = circle,
color = white,
style = filled ];
edge [ fontname = "Helvetica", fontsize = 12 ]
Closed [ fillcolor = green2 ];
"Half-Open" [fillcolor = yellow2 ];
Open [ fillcolor = red2 ];
Closed -> Closed [ label = "Success" ];
"Half-Open" -> Open [ label = "Trip Breaker" ];
"Half-Open" -> Closed [ label = "Reset Breaker" ];
Closed -> Open [ label = "Trip Breaker" ];
Open -> Open [ label = "Calls failing fast" ];
Open -> "Half-Open" [ label = "Attempt Reset" ];

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

View file

@ -109,7 +109,7 @@ public class FaultHandlingDocSample {
*/
public static class Worker extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
final Timeout askTimeout = new Timeout(Duration.parse("5 seconds"));
final Timeout askTimeout = new Timeout(Duration.create(5, "seconds"));
// The sender of the initial Start message will continuously be notified about progress
ActorRef progressListener;
@ -139,7 +139,7 @@ public class FaultHandlingDocSample {
if (msg.equals(Start) && progressListener == null) {
progressListener = getSender();
getContext().system().scheduler().schedule(
Duration.Zero(), Duration.parse("1 second"), getSelf(), Do, getContext().dispatcher()
Duration.Zero(), Duration.create(1, "second"), getSelf(), Do, getContext().dispatcher()
);
} else if (msg.equals(Do)) {
counterService.tell(new Increment(1), getSelf());
@ -299,7 +299,7 @@ public class FaultHandlingDocSample {
counter.tell(new UseStorage(null), getSelf());
// Try to re-establish storage after while
getContext().system().scheduler().scheduleOnce(
Duration.parse("10 seconds"), getSelf(), Reconnect, getContext().dispatcher()
Duration.create(10, "seconds"), getSelf(), Reconnect, getContext().dispatcher()
);
} else if (msg.equals(Reconnect)) {
// Re-establish storage after the scheduled delay

View file

@ -106,7 +106,7 @@ public class FutureDocTestBase {
ActorRef actor = system.actorOf(new Props(MyActor.class));
String msg = "hello";
//#ask-blocking
Timeout timeout = new Timeout(Duration.parse("5 seconds"));
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
//#ask-blocking
@ -518,7 +518,7 @@ public class FutureDocTestBase {
//#after
final ExecutionContext ec = system.dispatcher();
Future<String> failExc = Futures.failed(new IllegalStateException("OHNOES1"));
Future<String> delayed = Patterns.after(Duration.parse("500 millis"),
Future<String> delayed = Patterns.after(Duration.create(500, "millis"),
system.scheduler(), ec, failExc);
Future<String> future = future(new Callable<String>() {
public String call() throws InterruptedException {

View file

@ -0,0 +1,5 @@
package docs.jrouting;
import org.scalatest.junit.JUnitSuite
class ConsistentHashingRouterDocTest extends ConsistentHashingRouterDocTestBase with JUnitSuite

View file

@ -0,0 +1,136 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.jrouting;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.testkit.JavaTestKit;
import akka.actor.ActorSystem;
//#imports1
import akka.actor.UntypedActor;
import akka.routing.ConsistentHashingRouter.ConsistentHashable;
import java.util.Map;
import java.util.HashMap;
import java.io.Serializable;
//#imports1
//#imports2
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.routing.ConsistentHashingRouter;
import akka.routing.ConsistentHashingRouter.ConsistentHashMapper;
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
//#imports2
public class ConsistentHashingRouterDocTestBase {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create();
}
@AfterClass
public static void teardown() {
system.shutdown();
}
//#cache-actor
public static class Cache extends UntypedActor {
Map<String, String> cache = new HashMap<String, String>();
public void onReceive(Object msg) {
if (msg instanceof Entry) {
Entry entry = (Entry) msg;
cache.put(entry.key, entry.value);
} else if (msg instanceof Get) {
Get get = (Get) msg;
Object value = cache.get(get.key);
getSender().tell(value == null ? NOT_FOUND : value,
getContext().self());
} else if (msg instanceof Evict) {
Evict evict = (Evict) msg;
cache.remove(evict.key);
} else {
unhandled(msg);
}
}
}
public static final class Evict implements Serializable {
public final String key;
public Evict(String key) {
this.key = key;
}
}
public static final class Get implements Serializable, ConsistentHashable {
public final String key;
public Get(String key) {
this.key = key;
}
public Object consistentHashKey() {
return key;
}
}
public static final class Entry implements Serializable {
public final String key;
public final String value;
public Entry(String key, String value) {
this.key = key;
this.value = value;
}
}
public static final String NOT_FOUND = "NOT_FOUND";
//#cache-actor
@Test
public void demonstrateUsageOfConsistentHashableRouter() {
new JavaTestKit(system) {{
//#consistent-hashing-router
final ConsistentHashMapper hashMapper = new ConsistentHashMapper() {
@Override
public Object hashKey(Object message) {
if (message instanceof Evict) {
return ((Evict) message).key;
} else {
return null;
}
}
};
ActorRef cache = system.actorOf(new Props(Cache.class).withRouter(
new ConsistentHashingRouter(10).withHashMapper(hashMapper)),
"cache");
cache.tell(new ConsistentHashableEnvelope(
new Entry("hello", "HELLO"), "hello"), getRef());
cache.tell(new ConsistentHashableEnvelope(
new Entry("hi", "HI"), "hi"), getRef());
cache.tell(new Get("hello"), getRef());
expectMsgEquals("HELLO");
cache.tell(new Get("hi"), getRef());
expectMsgEquals("HI");
cache.tell(new Evict("hi"), getRef());
cache.tell(new Get("hi"), getRef());
expectMsgEquals(NOT_FOUND);
//#consistent-hashing-router
}};
}
}

View file

@ -72,7 +72,7 @@ public class CustomRouterDocTestBase {
routedActor.tell(RepublicanVote);
routedActor.tell(DemocratVote);
routedActor.tell(RepublicanVote);
Timeout timeout = new Timeout(Duration.parse("1 seconds"));
Timeout timeout = new Timeout(Duration.create(1, "seconds"));
Future<Object> democratsResult = ask(routedActor, DemocratCountResult, timeout);
Future<Object> republicansResult = ask(routedActor, RepublicanCountResult, timeout);

View file

@ -53,8 +53,8 @@ public class ParentActor extends UntypedActor {
//#scatterGatherFirstCompletedRouter
ActorRef scatterGatherFirstCompletedRouter = getContext().actorOf(
new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration
.parse("2 seconds"))), "router");
Timeout timeout = new Timeout(Duration.parse("5 seconds"));
.create(2, "seconds"))), "router");
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> futureResult = akka.pattern.Patterns.ask(scatterGatherFirstCompletedRouter,
new FibonacciActor.FibonacciNumber(10), timeout);
int result = (Integer) Await.result(futureResult, timeout.duration());

View file

@ -5,7 +5,7 @@ package docs.jrouting;
import akka.routing.RoundRobinRouter;
import akka.routing.DefaultResizer;
import akka.routing.RemoteRouterConfig;
import akka.remote.routing.RemoteRouterConfig;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

View file

@ -98,7 +98,7 @@ public class TestKitDocTest {
//#test-within
new JavaTestKit(system) {{
getRef().tell(42);
new Within(Duration.Zero(), Duration.parse("1 second")) {
new Within(Duration.Zero(), Duration.create(1, "second")) {
// do not put code outside this method, will run afterwards
public void run() {
assertEquals((Integer) 42, expectMsgClass(Integer.class));

View file

@ -187,7 +187,7 @@ public class ZeromqDocTestBase {
@Override
public void preStart() {
getContext().system().scheduler()
.schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK, getContext().dispatcher());
.schedule(Duration.create(1, "second"), Duration.create(1, "second"), getSelf(), TICK, getContext().dispatcher());
}
@Override

View file

@ -15,13 +15,14 @@ is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.SmallestMailboxRouter``
* ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter``
* ``akka.routing.ConsistentHashingRouter``
Routers In Action
^^^^^^^^^^^^^^^^^
This is an example of how to create a router that is defined in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigExample.scala#config
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin
.. includecode:: code/docs/jrouting/RouterViaConfigExample.java#configurableRouting
@ -177,6 +178,10 @@ is exactly what you would expect from a round-robin router to happen.
(The name of an actor is automatically created in the format ``$letter`` unless you specify it -
hence the names printed above.)
This is an example of how to define a round-robin router in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin
RandomRouter
************
As the name implies this router type selects one of its routees randomly and forwards
@ -204,6 +209,10 @@ When run you should see a similar output to this:
The result from running the random router should be different, or at least random, every time you run it.
Try to run it a couple of times to verify its behavior if you don't trust us.
This is an example of how to define a random router in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-random
SmallestMailboxRouter
*********************
A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
@ -219,6 +228,10 @@ Code example:
.. includecode:: code/docs/jrouting/ParentActor.java#smallestMailboxRouter
This is an example of how to define a smallest-mailbox router in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-smallest-mailbox
BroadcastRouter
***************
A broadcast router forwards the message it receives to *all* its routees.
@ -238,6 +251,10 @@ When run you should see a similar output to this:
As you can see here above each of the routees, five in total, received the broadcast message.
This is an example of how to define a broadcast router in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-broadcast
ScatterGatherFirstCompletedRouter
*********************************
The ScatterGatherFirstCompletedRouter will send the message on to all its routees as a future.
@ -255,6 +272,51 @@ When run you should see this:
From the output above you can't really see that all the routees performed the calculation, but they did!
The result you see is from the first routee that returned its calculation to the router.
This is an example of how to define a scatter-gather router in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-scatter-gather
ConsistentHashingRouter
***********************
The ConsistentHashingRouter uses `consistent hashing <http://en.wikipedia.org/wiki/Consistent_hashing>`_
to select a connection based on the sent message. This
`article <http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html>`_ gives good
insight into how consistent hashing is implemented.
There is 3 ways to define what data to use for the consistent hash key.
* You can define ``withHashMapper`` of the router to map incoming
messages to their consistent hash key. This makes the the decision
transparent for the sender.
* The messages may implement ``akka.routing.ConsistentHashingRouter.ConsistentHashable``.
The key is part of the message and it's convenient to define it together
with the message definition.
* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope``
to define what data to use for the consistent hash key. The sender knows
the key to use.
These ways to define the consistent hash key can be use together and at
the same time for one router. The ``withHashMapper`` is tried first.
Code example:
.. includecode:: code/docs/jrouting/ConsistentHashingRouterDocTestBase.java
:include: imports1,cache-actor
.. includecode:: code/docs/jrouting/ConsistentHashingRouterDocTestBase.java
:include: imports2,consistent-hashing-router
In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself,
while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict``
message is handled by the ``withHashMapper``.
This is an example of how to define a consistent-hashing router in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-consistent-hashing
Broadcast Messages
^^^^^^^^^^^^^^^^^^
@ -276,7 +338,7 @@ of routees dynamically.
This is an example of how to create a resizable router that is defined in configuration:
.. includecode:: ../scala/code/docs/routing/RouterViaConfigExample.scala#config-resize
.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-resize
.. includecode:: code/docs/jrouting/RouterViaConfigExample.java#configurableRoutingWithResizer

View file

@ -25,7 +25,7 @@ object DurableMailboxDocSpec {
val config = """
//#dispatcher-config
my-dispatcher {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
}
//#dispatcher-config
"""

View file

@ -6,7 +6,8 @@
The 2.1 release contains several structural changes that require some
simple, mechanical source-level changes in client code. Several things have
been moved to Scala standard library, such as ``Future``.
been moved to Scala standard library, such as ``Future``, and some package
names have been changed in Remoting and Durable Mailboxes.
When migrating from 1.3.x to 2.1.x you should first follow the instructions for
migrating `1.3.x to 2.0.x <http://doc.akka.io/docs/akka/2.0.3/project/migration-guide-1.3.x-2.0.x.html>`_.
@ -239,7 +240,7 @@ If the target actor of ``akka.pattern.gracefulStop`` isn't terminated within the
timeout the ``Future`` is completed with failure ``akka.pattern.AskTimeoutException``.
In 2.0 it was ``akka.actor.ActorTimeoutException``.
getInstance for singeltons - Java
getInstance for Singletons - Java
====================================
v2.0::
@ -286,7 +287,6 @@ Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sur
stashed messages are put into the dead letters when the actor stops, make sure you call
super.postStop if you override it.
Forward of Terminated message
=============================
@ -358,4 +358,103 @@ v2.1::
else if (requestedCapacity < 0) routeeProvider.removeRoutees(
-requestedCapacity, stopDelay)
Duration and Timeout
====================
The Duration class in the scala library is an improved version of the previous
:class:`akka.util.Duration`. Among others it keeps the static type of
:class:`FiniteDuration` more consistently, which has been used to tighten APIs.
The advantage is that instead of runtime exceptions youll get compiler errors
telling you if you try to pass a possibly non-finite duration where it does not
belong.
The main source incompatibility is that you may have to change the declared
type of fields from ``Duration`` to ``FiniteDuration`` (factory methods already
return the more precise type wherever possible).
Another change is that ``Duration.parse`` was not accepted by the scala-library
maintainers, use ``Duration.create`` instead.
v2.0::
final Duration d = Duration.parse("1 second");
final Timeout t = new Timeout(d);
v2.1::
final FiniteDuration d = Duration.create("1 second");
final Timeout t = new Timeout(d); // always required finite duration, now also in type
Package Name Changes in Remoting
================================
The package name of all classes in the ``akka-remote.jar`` artifact now starts with ``akka.remote``.
This has been done to enable OSGi bundles that don't have conflicting package names.
Change the following import statements. Please note that the serializers are often referenced from configuration.
================================================ =======================================================
Search Replace with
================================================ =======================================================
``akka.routing.RemoteRouterConfig`` ``akka.remote.routing.RemoteRouterConfig``
``akka.serialization.ProtobufSerializer`` ``akka.remote.serialization.ProtobufSerializer``
``akka.serialization.DaemonMsgCreateSerializer`` ``akka.remote.serialization.DaemonMsgCreateSerializer``
================================================ =======================================================
Package Name Changes in Durable Mailboxes
=========================================
The package name of all classes in the ``akka-file-mailbox.jar`` artifact now starts with ``akka.actor.mailbox.filebased``.
This has been done to enable OSGi bundles that don't have conflicting package names.
Change the following import statements. Please note that the ``FileBasedMailboxType`` is often referenced from configuration.
================================================ =========================================================
Search Replace with
================================================ =========================================================
``akka.actor.mailbox.FileBasedMailboxType`` ``akka.actor.mailbox.filebased.FileBasedMailboxType``
``akka.actor.mailbox.FileBasedMailboxSettings`` ``akka.actor.mailbox.filebased.FileBasedMailboxSettings``
``akka.actor.mailbox.FileBasedMessageQueue`` ``akka.actor.mailbox.filebased.FileBasedMessageQueue``
``akka.actor.mailbox.filequeue.*`` ``akka.actor.mailbox.filebased.filequeue.*``
================================================ =========================================================
Actor Receive Timeout
=====================
The API for setting and querying the receive timeout has been made more
consisten in always taking and returning a ``Duration``, the wrapping in
``Option`` has been removed.
(Samples for Java, Scala sources are affected in exactly the same way.)
v2.0::
getContext().setReceiveTimeout(Duration.create(10, SECONDS));
final Option<Duration> timeout = getContext().receiveTimeout();
final isSet = timeout.isDefined();
resetReceiveTimeout();
v2.1::
getContext().setReceiveTimeout(Duration.create(10, SECONDS));
final Duration timeout = getContext().receiveTimeout();
final isSet = timeout.isFinite();
getContext().setReceiveTimeout(Duration.Undefined());
ConsistentHash
==============
``akka.routing.ConsistentHash`` has been changed to an immutable data structure.
v2.0::
val consistentHash = new ConsistentHash(Seq(a1, a2, a3), replicas = 10)
consistentHash += a4
val a = consistentHash.nodeFor(data)
v2.1::
var consistentHash = ConsistentHash(Seq(a1, a2, a3), replicas = 10)
consistentHash = consistentHash :+ a4
val a = consistentHash.nodeFor(data)

View file

@ -51,6 +51,16 @@ be able to handle unknown messages then you need to have a default case as in
the example above. Otherwise an ``akka.actor.UnhandledMessage(message, sender, recipient)`` will be
published to the ``ActorSystem``'s ``EventStream``.
The result of the :meth:`receive` method is a partial function object, which is
stored within the actor as its “initial behavior”, see `Become/Unbecome`_ for
further information on changing the behavior of an actor after its
construction.
.. note::
The initial behavior of an Actor is extracted prior to constructor is run,
so if you want to base your initial behavior on member state, you should
use ``become`` in the constructor.
Creating Actors with default constructor
----------------------------------------

View file

@ -45,7 +45,7 @@ object ExtensionDocSpec {
val config = """
//#config
akka {
extensions = ["docs.extension.CountExtension$"]
extensions = ["docs.extension.CountExtension"]
}
//#config
"""

View file

@ -0,0 +1,73 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.routing
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
object ConsistentHashingRouterDocSpec {
//#cache-actor
import akka.actor.Actor
import akka.routing.ConsistentHashingRouter.ConsistentHashable
class Cache extends Actor {
var cache = Map.empty[String, String]
def receive = {
case Entry(key, value) cache += (key -> value)
case Get(key) sender ! cache.get(key)
case Evict(key) cache -= key
}
}
case class Evict(key: String)
case class Get(key: String) extends ConsistentHashable {
override def consistentHashKey: Any = key
}
case class Entry(key: String, value: String)
//#cache-actor
}
class ConsistentHashingRouterDocSpec extends AkkaSpec with ImplicitSender {
import ConsistentHashingRouterDocSpec._
"demonstrate usage of ConsistentHashableRouter" in {
//#consistent-hashing-router
import akka.actor.Props
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
def hashMapping: ConsistentHashMapping = {
case Evict(key) key
}
val cache = system.actorOf(Props[Cache].withRouter(ConsistentHashingRouter(10,
hashMapping = hashMapping)), name = "cache")
cache ! ConsistentHashableEnvelope(
message = Entry("hello", "HELLO"), hashKey = "hello")
cache ! ConsistentHashableEnvelope(
message = Entry("hi", "HI"), hashKey = "hi")
cache ! Get("hello")
expectMsg(Some("HELLO"))
cache ! Get("hi")
expectMsg(Some("HI"))
cache ! Evict("hi")
cache ! Get("hi")
expectMsg(None)
//#consistent-hashing-router
}
}

View file

@ -0,0 +1,158 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.routing
import akka.actor.{ Actor, Props, ActorSystem, ActorLogging }
import com.typesafe.config.ConfigFactory
import akka.routing.FromConfig
import akka.routing.ConsistentHashingRouter.ConsistentHashable
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
object RouterWithConfigDocSpec {
val config = ConfigFactory.parseString("""
//#config-round-robin
akka.actor.deployment {
/myrouter1 {
router = round-robin
nr-of-instances = 5
}
}
//#config-round-robin
//#config-resize
akka.actor.deployment {
/myrouter2 {
router = round-robin
resizer {
lower-bound = 2
upper-bound = 15
}
}
}
//#config-resize
//#config-random
akka.actor.deployment {
/myrouter3 {
router = random
nr-of-instances = 5
}
}
//#config-random
//#config-smallest-mailbox
akka.actor.deployment {
/myrouter4 {
router = smallest-mailbox
nr-of-instances = 5
}
}
//#config-smallest-mailbox
//#config-broadcast
akka.actor.deployment {
/myrouter5 {
router = broadcast
nr-of-instances = 5
}
}
//#config-broadcast
//#config-scatter-gather
akka.actor.deployment {
/myrouter6 {
router = scatter-gather
nr-of-instances = 5
within = 10 seconds
}
}
//#config-scatter-gather
//#config-consistent-hashing
akka.actor.deployment {
/myrouter7 {
router = consistent-hashing
nr-of-instances = 5
virtual-nodes-factor = 10
}
}
//#config-consistent-hashing
""")
case class Message(nbr: Int) extends ConsistentHashable {
override def consistentHashKey = nbr
}
class ExampleActor extends Actor with ActorLogging {
def receive = {
case Message(nbr)
log.debug("Received %s in router %s".format(nbr, self.path.name))
sender ! nbr
}
}
}
class RouterWithConfigDocSpec extends AkkaSpec(RouterWithConfigDocSpec.config) with ImplicitSender {
import RouterWithConfigDocSpec._
"demonstrate configured round-robin router" in {
//#configurableRouting
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"myrouter1")
//#configurableRouting
1 to 10 foreach { i router ! Message(i) }
receiveN(10)
}
"demonstrate configured random router" in {
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"myrouter3")
1 to 10 foreach { i router ! Message(i) }
receiveN(10)
}
"demonstrate configured smallest-mailbox router" in {
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"myrouter4")
1 to 10 foreach { i router ! Message(i) }
receiveN(10)
}
"demonstrate configured broadcast router" in {
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"myrouter5")
1 to 10 foreach { i router ! Message(i) }
receiveN(5 * 10)
}
"demonstrate configured scatter-gather router" in {
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"myrouter6")
1 to 10 foreach { i router ! Message(i) }
receiveN(10)
}
"demonstrate configured consistent-hashing router" in {
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"myrouter7")
1 to 10 foreach { i router ! Message(i) }
receiveN(10)
}
"demonstrate configured round-robin router with resizer" in {
//#configurableRoutingWithResizer
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"myrouter2")
//#configurableRoutingWithResizer
1 to 10 foreach { i router ! Message(i) }
receiveN(10)
}
}

View file

@ -1,52 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.routing
import akka.actor.{ Actor, Props, ActorSystem }
import com.typesafe.config.ConfigFactory
import akka.routing.FromConfig
case class Message(nbr: Int)
class ExampleActor extends Actor {
def receive = {
case Message(nbr) println("Received %s in router %s".format(nbr, self.path.name))
}
}
object RouterWithConfigExample extends App {
val config = ConfigFactory.parseString("""
//#config
akka.actor.deployment {
/router {
router = round-robin
nr-of-instances = 5
}
}
//#config
//#config-resize
akka.actor.deployment {
/router2 {
router = round-robin
resizer {
lower-bound = 2
upper-bound = 15
}
}
}
//#config-resize
""")
val system = ActorSystem("Example", config)
//#configurableRouting
val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"router")
//#configurableRouting
1 to 10 foreach { i router ! Message(i) }
//#configurableRoutingWithResizer
val router2 = system.actorOf(Props[ExampleActor].withRouter(FromConfig()),
"router2")
//#configurableRoutingWithResizer
1 to 10 foreach { i router2 ! Message(i) }
}

View file

@ -6,7 +6,7 @@ package docs.routing
import akka.routing.RoundRobinRouter
import akka.actor.{ ActorRef, Props, Actor, ActorSystem }
import akka.routing.DefaultResizer
import akka.routing.RemoteRouterConfig
import akka.remote.routing.RemoteRouterConfig
case class Message1(nbr: Int)

View file

@ -104,7 +104,7 @@ package docs.serialization {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
myown = "docs.serialization.MyOwnSerializer"
}
}
@ -122,7 +122,7 @@ package docs.serialization {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
myown = "docs.serialization.MyOwnSerializer"
}

View file

@ -58,8 +58,6 @@ in the ``akka.extensions`` section of the config you provide to your ``ActorSyst
.. includecode:: code/docs/extension/ExtensionDocSpec.scala
:include: config
Note that in this case ``CountExtension`` is an object and therefore the class name ends with ``$``.
Applicability
=============

View file

@ -15,15 +15,16 @@ is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.SmallestMailboxRouter``
* ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter``
* ``akka.routing.ConsistentHashingRouter``
Routers In Action
^^^^^^^^^^^^^^^^^
This is an example of how to create a router that is defined in configuration:
.. includecode:: code/docs/routing/RouterViaConfigExample.scala#config
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin
.. includecode:: code/docs/routing/RouterViaConfigExample.scala#configurableRouting
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#configurableRouting
This is an example of how to programmatically create a router and set the number of routees it should create:
@ -125,7 +126,7 @@ not have an effect on the number of actors in the pool.
Setting the strategy is easily done:
.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala#supervision
:include: supervision
:exclude: custom-strategy
@ -179,6 +180,10 @@ is exactly what you would expect from a round-robin router to happen.
(The name of an actor is automatically created in the format ``$letter`` unless you specify it -
hence the names printed above.)
This is an example of how to define a round-robin router in configuration:
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin
RandomRouter
************
As the name implies this router type selects one of its routees randomly and forwards
@ -206,6 +211,10 @@ When run you should see a similar output to this:
The result from running the random router should be different, or at least random, every time you run it.
Try to run it a couple of times to verify its behavior if you don't trust us.
This is an example of how to define a random router in configuration:
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-random
SmallestMailboxRouter
*********************
A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
@ -221,6 +230,11 @@ Code example:
.. includecode:: code/docs/routing/RouterTypeExample.scala#smallestMailboxRouter
This is an example of how to define a smallest-mailbox router in configuration:
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-smallest-mailbox
BroadcastRouter
***************
A broadcast router forwards the message it receives to *all* its routees.
@ -240,6 +254,11 @@ When run you should see a similar output to this:
As you can see here above each of the routees, five in total, received the broadcast message.
This is an example of how to define a broadcast router in configuration:
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-broadcast
ScatterGatherFirstCompletedRouter
*********************************
The ScatterGatherFirstCompletedRouter will send the message on to all its routees as a future.
@ -257,6 +276,51 @@ When run you should see this:
From the output above you can't really see that all the routees performed the calculation, but they did!
The result you see is from the first routee that returned its calculation to the router.
This is an example of how to define a scatter-gather router in configuration:
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-scatter-gather
ConsistentHashingRouter
***********************
The ConsistentHashingRouter uses `consistent hashing <http://en.wikipedia.org/wiki/Consistent_hashing>`_
to select a connection based on the sent message. This
`article <http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html>`_ gives good
insight into how consistent hashing is implemented.
There is 3 ways to define what data to use for the consistent hash key.
* You can define ``hashMapping`` of the router to map incoming
messages to their consistent hash key. This makes the decision
transparent for the sender.
* The messages may implement ``akka.routing.ConsistentHashingRouter.ConsistentHashable``.
The key is part of the message and it's convenient to define it together
with the message definition.
* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope``
to define what data to use for the consistent hash key. The sender knows
the key to use.
These ways to define the consistent hash key can be use together and at
the same time for one router. The ``hashMapping`` is tried first.
Code example:
.. includecode:: code/docs/routing/ConsistentHashingRouterDocSpec.scala#cache-actor
.. includecode:: code/docs/routing/ConsistentHashingRouterDocSpec.scala#consistent-hashing-router
In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself,
while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict``
message is handled by the ``hashMapping`` partial function.
This is an example of how to define a consistent-hashing router in configuration:
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-consistent-hashing
Broadcast Messages
^^^^^^^^^^^^^^^^^^
@ -278,9 +342,9 @@ of routees dynamically.
This is an example of how to create a resizable router that is defined in configuration:
.. includecode:: code/docs/routing/RouterViaConfigExample.scala#config-resize
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-resize
.. includecode:: code/docs/routing/RouterViaConfigExample.scala#configurableRoutingWithResizer
.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#configurableRoutingWithResizer
Several more configuration options are available and described in ``akka.actor.deployment.default.resizer``
section of the reference :ref:`configuration`.

Some files were not shown because too many files have changed in this diff Show more