diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 23a48d5417..bae6d2f6fe 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -5,7 +5,6 @@ package akka.actor import language.postfixOps - import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.testkit._ import TestEvent.Mute @@ -15,6 +14,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.Await import akka.util.Timeout import scala.concurrent.util.Duration +import scala.concurrent.util.FiniteDuration object FSMActorSpec { val timeout = Timeout(2 seconds) @@ -33,7 +33,7 @@ object FSMActorSpec { case object Locked extends LockState case object Open extends LockState - class Lock(code: String, timeout: Duration, latches: Latches) extends Actor with FSM[LockState, CodeState] { + class Lock(code: String, timeout: FiniteDuration, latches: Latches) extends Actor with FSM[LockState, CodeState] { import latches._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 72efaef0d4..58ffb9c602 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -5,7 +5,6 @@ package akka.actor import language.postfixOps - import akka.util.ByteString import scala.concurrent.{ ExecutionContext, Await, Future, Promise } import scala.concurrent.util.{ Duration, Deadline } @@ -17,6 +16,7 @@ import akka.pattern.ask import java.net.{ Socket, InetSocketAddress, InetAddress, SocketAddress } import scala.util.Failure import annotation.tailrec +import scala.concurrent.util.FiniteDuration object IOActorSpec { @@ -244,7 +244,10 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { * @param filter determines which exceptions should be retried * @return a future containing the result or the last exception before a limit was hit. */ - def retry[T](count: Option[Int] = None, timeout: Option[Duration] = None, delay: Option[Duration] = Some(100 millis), filter: Option[Throwable ⇒ Boolean] = None)(future: ⇒ Future[T])(implicit executor: ExecutionContext): Future[T] = { + def retry[T](count: Option[Int] = None, + timeout: Option[FiniteDuration] = None, + delay: Option[FiniteDuration] = Some(100 millis), + filter: Option[Throwable ⇒ Boolean] = None)(future: ⇒ Future[T])(implicit executor: ExecutionContext): Future[T] = { val promise = Promise[T]() diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index dc5229ae41..a74cbc9839 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -5,13 +5,12 @@ package akka.actor import language.postfixOps - import akka.testkit._ import scala.concurrent.util.duration._ - import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import java.util.concurrent.TimeoutException +import scala.concurrent.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ReceiveTimeoutSpec extends AkkaSpec { @@ -65,7 +64,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { case ReceiveTimeout ⇒ count.incrementAndGet timeoutLatch.open - context.resetReceiveTimeout() + context.setReceiveTimeout(Duration.Undefined) } })) diff --git a/akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java b/akka-actor/src/main/java/akka/actor/dungeon/AbstractActorCell.java similarity index 65% rename from akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java rename to akka-actor/src/main/java/akka/actor/dungeon/AbstractActorCell.java index 2d8c4fbc1e..6735b6e2cb 100644 --- a/akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java +++ b/akka-actor/src/main/java/akka/actor/dungeon/AbstractActorCell.java @@ -2,7 +2,7 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor.cell; +package akka.actor.dungeon; import akka.actor.ActorCell; import akka.util.Unsafe; @@ -14,9 +14,9 @@ final class AbstractActorCell { static { try { - mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Dispatch$$_mailboxDoNotCallMeDirectly")); - childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_childrenRefsDoNotCallMeDirectly")); - nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_nextNameDoNotCallMeDirectly")); + mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$dungeon$Dispatch$$_mailboxDoNotCallMeDirectly")); + childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$dungeon$Children$$_childrenRefsDoNotCallMeDirectly")); + nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$dungeon$Children$$_nextNameDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java index cc4328d763..1630f599ee 100644 --- a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java +++ b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import scala.concurrent.util.Duration; +import scala.concurrent.util.FiniteDuration; import akka.event.LoggingAdapter; import akka.util.Unsafe; @@ -241,7 +242,7 @@ public class HashedWheelTimer implements Timer { return new HashedWheelTimeout(this, task, time); } - public Timeout newTimeout(TimerTask task, Duration delay) { + public Timeout newTimeout(TimerTask task, FiniteDuration delay) { final long currentTime = System.nanoTime(); if (task == null) { diff --git a/akka-actor/src/main/java/akka/util/internal/Timer.java b/akka-actor/src/main/java/akka/util/internal/Timer.java index 7086aef9c6..be7656ec6c 100644 --- a/akka-actor/src/main/java/akka/util/internal/Timer.java +++ b/akka-actor/src/main/java/akka/util/internal/Timer.java @@ -15,9 +15,9 @@ */ package akka.util.internal; -import scala.concurrent.util.Duration; import java.util.Set; -import java.util.concurrent.TimeUnit; + +import scala.concurrent.util.FiniteDuration; /** * Schedules {@link TimerTask}s for one-time future execution in a background @@ -42,7 +42,7 @@ public interface Timer { * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ - Timeout newTimeout(TimerTask task, Duration delay); + Timeout newTimeout(TimerTask task, FiniteDuration delay); /** * Releases all resources acquired by this {@link Timer} and cancels all diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c34b9ac05e..2c12dae8e4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -4,13 +4,13 @@ package akka.actor -import cell.ChildrenContainer.{ WaitingForChildren } import java.io.{ ObjectOutputStream, NotSerializableException } import scala.annotation.tailrec import scala.collection.immutable.TreeSet import scala.concurrent.util.Duration import scala.util.control.NonFatal -import akka.actor.cell.ChildrenContainer +import akka.actor.dungeon.ChildrenContainer +import akka.actor.dungeon.ChildrenContainer.WaitingForChildren import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated } import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure @@ -55,23 +55,25 @@ trait ActorContext extends ActorRefFactory { def props: Props /** - * Gets the current receive timeout + * Gets the current receive timeout. * When specified, the receive method should be able to handle a 'ReceiveTimeout' message. */ - def receiveTimeout: Option[Duration] + def receiveTimeout: Duration /** - * Defines the default timeout for an initial receive invocation. + * Defines the inactivity timeout after which the sending of a `ReceiveTimeout` message is triggered. * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. * 1 millisecond is the minimum supported timeout. + * + * Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after + * another message was enqueued; hence it is '''not guaranteed''' that upon reception of the receive + * timeout there must have been an idle period beforehand as configured via this method. + * + * Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity + * periods). Pass in `Duration.Undefined` to switch off this feature. */ def setReceiveTimeout(timeout: Duration): Unit - /** - * Clears the receive timeout, i.e. deactivates this feature. - */ - def resetReceiveTimeout(): Unit - /** * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Puts the behavior on top of the hotswap stack. @@ -290,11 +292,11 @@ private[akka] class ActorCell( val props: Props, val parent: InternalActorRef) extends UntypedActorContext with Cell - with cell.ReceiveTimeout - with cell.Children - with cell.Dispatch - with cell.DeathWatch - with cell.FaultHandling { + with dungeon.ReceiveTimeout + with dungeon.Children + with dungeon.Dispatch + with dungeon.DeathWatch + with dungeon.FaultHandling { import ActorCell._ diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index bce966b99e..3d3c4b83d4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -17,7 +17,7 @@ import akka.util._ import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.cell.ChildrenContainer +import akka.actor.dungeon.ChildrenContainer import scala.concurrent.util.FiniteDuration import util.{ Failure, Success } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 51d0c290dc..a58abb0ac3 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -4,11 +4,11 @@ package akka.actor import language.implicitConversions - import akka.util._ import scala.concurrent.util.Duration import scala.collection.mutable import akka.routing.{ Deafen, Listen, Listeners } +import scala.concurrent.util.FiniteDuration object FSM { @@ -92,7 +92,7 @@ object FSM { private val scheduler = context.system.scheduler private implicit val executionContext = context.dispatcher - def schedule(actor: ActorRef, timeout: Duration): Unit = + def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = ref = Some( if (repeat) scheduler.schedule(timeout, timeout, actor, this) else scheduler.scheduleOnce(timeout, actor, this)) @@ -121,15 +121,18 @@ object FSM { * name, the state data, possibly custom timeout, stop reason and replies * accumulated while processing the last message. */ - case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) { + case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) { /** * Modify state transition descriptor to include a state timeout for the * next state. This timeout overrides any default timeout set for the next * state. + * + * Use Duration.Inf to deactivate an existing timeout. */ - def forMax(timeout: Duration): State[S, D] = { - copy(timeout = Some(timeout)) + def forMax(timeout: Duration): State[S, D] = timeout match { + case f: FiniteDuration ⇒ copy(timeout = Some(f)) + case _ ⇒ copy(timeout = None) } /** @@ -245,7 +248,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { type State = FSM.State[S, D] type StateFunction = scala.PartialFunction[Event, State] - type Timeout = Option[Duration] + type Timeout = Option[FiniteDuration] type TransitionHandler = PartialFunction[(S, S), Unit] /* @@ -279,7 +282,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { * @param stateTimeout default state timeout for this state * @param stateFunction partial function describing response to input */ - final def when(stateName: S, stateTimeout: Duration = null)(stateFunction: StateFunction): Unit = + final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit = register(stateName, stateFunction, Option(stateTimeout)) /** @@ -339,7 +342,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { * @param repeat send once if false, scheduleAtFixedRate if true * @return current state descriptor */ - final def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = { + final def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean): State = { if (debugEvent) log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) if (timers contains name) { diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 3f0539f123..8ca8ab5cb7 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -12,7 +12,7 @@ import scala.annotation.tailrec import scala.collection.mutable.Queue import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.actor.cell.ChildrenContainer +import akka.actor.dungeon.ChildrenContainer import akka.dispatch.{ Envelope, Supervise, SystemMessage, Terminate } import akka.event.Logging.Warning import akka.util.Unsafe diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 42f2a10604..c210bc0976 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import akka.util.internal._ import concurrent.ExecutionContext +import scala.concurrent.util.FiniteDuration //#scheduler /** @@ -29,13 +30,13 @@ trait Scheduler { * Schedules a message to be sent repeatedly with an initial delay and * frequency. E.g. if you would like a message to be sent immediately and * thereafter every 500ms you would set delay=Duration.Zero and - * frequency=Duration(500, TimeUnit.MILLISECONDS) + * interval=Duration(500, TimeUnit.MILLISECONDS) * * Java & Scala API */ def schedule( - initialDelay: Duration, - frequency: Duration, + initialDelay: FiniteDuration, + interval: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable @@ -43,23 +44,23 @@ trait Scheduler { * Schedules a function to be run repeatedly with an initial delay and a * frequency. E.g. if you would like the function to be run after 2 seconds * and thereafter every 100ms you would set delay = Duration(2, TimeUnit.SECONDS) - * and frequency = Duration(100, TimeUnit.MILLISECONDS) + * and interval = Duration(100, TimeUnit.MILLISECONDS) * * Scala API */ def schedule( - initialDelay: Duration, frequency: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable + initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable /** * Schedules a function to be run repeatedly with an initial delay and * a frequency. E.g. if you would like the function to be run after 2 * seconds and thereafter every 100ms you would set delay = Duration(2, - * TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) + * TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS) * * Java API */ def schedule( - initialDelay: Duration, frequency: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable + initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable /** * Schedules a Runnable to be run once with a delay, i.e. a time period that @@ -67,7 +68,7 @@ trait Scheduler { * * Java & Scala API */ - def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable + def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable /** * Schedules a message to be sent once with a delay, i.e. a time period that has @@ -75,7 +76,7 @@ trait Scheduler { * * Java & Scala API */ - def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable + def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable /** * Schedules a function to be run once with a delay, i.e. a time period that has @@ -83,7 +84,7 @@ trait Scheduler { * * Scala API */ - def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable + def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable } //#scheduler @@ -120,8 +121,8 @@ trait Cancellable { * returned from stop(). */ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) extends Scheduler with Closeable { - override def schedule(initialDelay: Duration, - delay: Duration, + override def schedule(initialDelay: FiniteDuration, + delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = { val continuousCancellable = new ContinuousCancellable @@ -142,12 +143,12 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) initialDelay)) } - override def schedule(initialDelay: Duration, - delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = + override def schedule(initialDelay: FiniteDuration, + delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = schedule(initialDelay, delay, new Runnable { override def run = f }) - override def schedule(initialDelay: Duration, - delay: Duration, + override def schedule(initialDelay: FiniteDuration, + delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { val continuousCancellable = new ContinuousCancellable continuousCancellable.init( @@ -163,20 +164,20 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) initialDelay)) } - override def scheduleOnce(delay: Duration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = new DefaultCancellable( hashedWheelTimer.newTimeout( new TimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) }, delay)) - override def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = + override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = scheduleOnce(delay, new Runnable { override def run = receiver ! message }) - override def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = + override def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = scheduleOnce(delay, new Runnable { override def run = f }) private trait ContinuousScheduling { this: TimerTask ⇒ - def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) { + def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) { try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException ⇒ } // stop recurring if timer is stopped } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala b/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala deleted file mode 100644 index c04d485262..0000000000 --- a/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.actor.cell - -import ReceiveTimeout.emptyReceiveTimeoutData -import akka.actor.ActorCell -import akka.actor.ActorCell.emptyCancellable -import akka.actor.Cancellable -import scala.concurrent.util.Duration - -private[akka] object ReceiveTimeout { - final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable) -} - -private[akka] trait ReceiveTimeout { this: ActorCell ⇒ - - import ReceiveTimeout._ - import ActorCell._ - - private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData - - final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match { - case Duration.Undefined ⇒ None - case duration ⇒ Some(duration) - } - - final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined)) - - final def setReceiveTimeout(timeout: Duration): Unit = - receiveTimeoutData = ( - if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout, - receiveTimeoutData._2) - - final def resetReceiveTimeout(): Unit = setReceiveTimeout(None) - - final def checkReceiveTimeout() { - val recvtimeout = receiveTimeoutData - if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) { - recvtimeout._2.cancel() //Cancel any ongoing future - //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout)(this.dispatcher)) - } else cancelReceiveTimeout() - - } - - final def cancelReceiveTimeout(): Unit = - if (receiveTimeoutData._2 ne emptyCancellable) { - receiveTimeoutData._2.cancel() - receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable) - } - -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala index e01e87262d..7b1a77bc71 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -129,10 +129,10 @@ trait Inbox { this: ActorDSL.type ⇒ val next = clientsByTimeout.head.deadline import context.dispatcher if (currentDeadline.isEmpty) { - currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) + currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft.asInstanceOf[FiniteDuration], self, Kick))) } else if (currentDeadline.get._1 != next) { currentDeadline.get._2.cancel() - currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) + currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft.asInstanceOf[FiniteDuration], self, Kick))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala similarity index 99% rename from akka-actor/src/main/scala/akka/actor/cell/Children.scala rename to akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 7fa8eceece..85dfd7095a 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor.cell +package akka.actor.dungeon import scala.annotation.tailrec import scala.collection.JavaConverters.asJavaIterableConverter diff --git a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala similarity index 99% rename from akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala rename to akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala index d0bf76953a..eeb28cf018 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor.cell +package akka.actor.dungeon import scala.collection.immutable.TreeMap diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala similarity index 99% rename from akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala rename to akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index f994e956c6..5407afc2c8 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor.cell +package akka.actor.dungeon import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated } import akka.dispatch.{ Watch, Unwatch } diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala similarity index 99% rename from akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala rename to akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 0f3619e208..e071c1605d 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor.cell +package akka.actor.dungeon import scala.annotation.tailrec import akka.actor.{ ActorRef, ActorCell } diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala similarity index 98% rename from akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala rename to akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 383db82b70..f7c06032c8 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor.cell +package akka.actor.dungeon import scala.annotation.tailrec import akka.actor.{ PreRestartException, PostRestartException, InternalActorRef, Failed, ActorRef, ActorInterruptedException, ActorCell, Actor } @@ -16,6 +16,7 @@ import akka.actor.PreRestartException import akka.actor.Failed import akka.actor.PostRestartException import akka.event.Logging.Debug +import scala.concurrent.util.Duration private[akka] trait FaultHandling { this: ActorCell ⇒ @@ -121,7 +122,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status) assert(perpetrator == self) - setReceiveTimeout(None) + setReceiveTimeout(Duration.Undefined) cancelReceiveTimeout // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) @@ -137,7 +138,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ } protected def terminate() { - setReceiveTimeout(None) + setReceiveTimeout(Duration.Undefined) cancelReceiveTimeout // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/ReceiveTimeout.scala b/akka-actor/src/main/scala/akka/actor/dungeon/ReceiveTimeout.scala new file mode 100644 index 0000000000..0c3661b59a --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/dungeon/ReceiveTimeout.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.dungeon + +import ReceiveTimeout.emptyReceiveTimeoutData +import akka.actor.ActorCell +import akka.actor.ActorCell.emptyCancellable +import akka.actor.Cancellable +import scala.concurrent.util.Duration +import scala.concurrent.util.FiniteDuration + +private[akka] object ReceiveTimeout { + final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable) +} + +private[akka] trait ReceiveTimeout { this: ActorCell ⇒ + + import ReceiveTimeout._ + import ActorCell._ + + private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData + + final def receiveTimeout: Duration = receiveTimeoutData._1 + + final def setReceiveTimeout(timeout: Duration): Unit = receiveTimeoutData = receiveTimeoutData.copy(_1 = timeout) + + final def checkReceiveTimeout() { + val recvtimeout = receiveTimeoutData + //Only reschedule if desired and there are currently no more messages to be processed + if (!mailbox.hasMessages) recvtimeout._1 match { + case f: FiniteDuration ⇒ + recvtimeout._2.cancel() //Cancel any ongoing future + val task = system.scheduler.scheduleOnce(f, self, akka.actor.ReceiveTimeout)(this.dispatcher) + receiveTimeoutData = (f, task) + case _ ⇒ cancelReceiveTimeout() + } + else cancelReceiveTimeout() + + } + + final def cancelReceiveTimeout(): Unit = + if (receiveTimeoutData._2 ne emptyCancellable) { + receiveTimeoutData._2.cancel() + receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable) + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 340195d1a6..eeff39f2e6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -16,6 +16,7 @@ import scala.concurrent.forkjoin.{ ForkJoinTask, ForkJoinPool } import scala.concurrent.util.Duration import scala.concurrent.{ ExecutionContext, Await, Awaitable } import scala.util.control.NonFatal +import scala.concurrent.util.FiniteDuration final case class Envelope private (val message: Any, val sender: ActorRef) @@ -316,7 +317,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext * * INTERNAL API */ - protected[akka] def shutdownTimeout: Duration + protected[akka] def shutdownTimeout: FiniteDuration /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 3897027d9b..c90048c80b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -11,6 +11,7 @@ import akka.util.Helpers import java.util.{ Comparator, Iterator } import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import akka.actor.ActorSystemImpl +import scala.concurrent.util.FiniteDuration /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -33,7 +34,7 @@ class BalancingDispatcher( throughputDeadlineTime: Duration, mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, - _shutdownTimeout: Duration, + _shutdownTimeout: FiniteDuration, attemptTeamWork: Boolean) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 21f4612750..96166022f8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -12,6 +12,7 @@ import java.util.concurrent.{ ExecutorService, RejectedExecutionException } import scala.concurrent.forkjoin.ForkJoinPool import scala.concurrent.util.Duration import scala.concurrent.Awaitable +import scala.concurrent.util.FiniteDuration /** * The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a @@ -32,7 +33,7 @@ class Dispatcher( val throughputDeadlineTime: Duration, val mailboxType: MailboxType, executorServiceFactoryProvider: ExecutorServiceFactoryProvider, - val shutdownTimeout: Duration) + val shutdownTimeout: FiniteDuration) extends MessageDispatcher(_prerequisites) { private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate { diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index 8fdde39cb3..af421ddb96 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -6,6 +6,7 @@ package akka.dispatch import akka.actor.ActorCell import scala.concurrent.util.Duration +import scala.concurrent.util.FiniteDuration /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -18,7 +19,7 @@ class PinnedDispatcher( _actor: ActorCell, _id: String, _mailboxType: MailboxType, - _shutdownTimeout: Duration, + _shutdownTimeout: FiniteDuration, _threadPoolConfig: ThreadPoolConfig = ThreadPoolConfig()) extends Dispatcher(_prerequisites, _id, diff --git a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala index 72335d810b..dc398e7fa2 100644 --- a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala @@ -8,13 +8,14 @@ import scala.concurrent.util.Duration import scala.concurrent.{ ExecutionContext, Promise, Future } import akka.actor._ import scala.util.control.NonFatal +import scala.concurrent.util.FiniteDuration trait FutureTimeoutSupport { /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value * after the specified duration. */ - def after[T](duration: Duration, using: Scheduler)(value: ⇒ Future[T])(implicit ec: ExecutionContext): Future[T] = + def after[T](duration: FiniteDuration, using: Scheduler)(value: ⇒ Future[T])(implicit ec: ExecutionContext): Future[T] = if (duration.isFinite() && duration.length < 1) { try value catch { case NonFatal(t) ⇒ Future.failed(t) } } else { diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index bd86cc4930..c4440f4723 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -111,13 +111,13 @@ object Patterns { * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * after the specified duration. */ - def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] = + def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Callable[Future[T]]): Future[T] = scalaAfter(duration, scheduler)(value.call())(context) /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value * after the specified duration. */ - def after[T](duration: Duration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = + def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = scalaAfter(duration, scheduler)(value)(context) } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 95fe9d9db2..cb6f37eb8d 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -283,7 +283,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi * The reason for the delay is to give concurrent messages a chance to be * placed in mailbox before sending PoisonPill. */ - def removeRoutees(nrOfInstances: Int, stopDelay: Duration): Unit = { + def removeRoutees(nrOfInstances: Int, stopDelay: FiniteDuration): Unit = { if (nrOfInstances <= 0) { throw new IllegalArgumentException("Expected positive nrOfInstances, got [%s]".format(nrOfInstances)) } else if (nrOfInstances > 0) { @@ -298,7 +298,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi * Give concurrent messages a chance to be placed in mailbox before * sending PoisonPill. */ - protected def delayedStop(scheduler: Scheduler, abandon: Iterable[ActorRef], stopDelay: Duration): Unit = { + protected def delayedStop(scheduler: Scheduler, abandon: Iterable[ActorRef], stopDelay: FiniteDuration): Unit = { if (abandon.nonEmpty) { if (stopDelay <= Duration.Zero) { abandon foreach (_ ! PoisonPill) @@ -1332,7 +1332,7 @@ case class DefaultResizer( * messages a chance to be placed in mailbox before sending PoisonPill. * Use 0 seconds to skip delay. */ - stopDelay: Duration = 1.second, + stopDelay: FiniteDuration = 1.second, /** * Number of messages between resize operation. * Use 1 to resize before each message. diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 9a4dcaa62b..f924dc20de 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -5,7 +5,6 @@ package akka.cluster import language.implicitConversions - import akka.actor._ import akka.actor.Status._ import akka.ConfigurationException @@ -20,13 +19,12 @@ import scala.concurrent.util.{ Duration, Deadline } import scala.concurrent.forkjoin.ThreadLocalRandom import scala.annotation.tailrec import scala.collection.immutable.SortedSet - import java.io.Closeable import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference - import akka.util.internal.HashedWheelTimer import concurrent.{ ExecutionContext, Await } +import scala.concurrent.util.FiniteDuration /** * Cluster Extension Id and factory for creating Cluster extension. @@ -111,26 +109,26 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { new Scheduler with Closeable { override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing - override def schedule(initialDelay: Duration, frequency: Duration, + override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = - systemScheduler.schedule(initialDelay, frequency, receiver, message) + systemScheduler.schedule(initialDelay, interval, receiver, message) - override def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = - systemScheduler.schedule(initialDelay, frequency)(f) + override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = + systemScheduler.schedule(initialDelay, interval)(f) - override def schedule(initialDelay: Duration, frequency: Duration, + override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = - systemScheduler.schedule(initialDelay, frequency, runnable) + systemScheduler.schedule(initialDelay, interval, runnable) - override def scheduleOnce(delay: Duration, + override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = systemScheduler.scheduleOnce(delay, runnable) - override def scheduleOnce(delay: Duration, receiver: ActorRef, + override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = systemScheduler.scheduleOnce(delay, receiver, message) - override def scheduleOnce(delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = + override def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = systemScheduler.scheduleOnce(delay)(f) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 9557a20f7a..3d198572c9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -16,6 +16,7 @@ import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import language.existentials import language.postfixOps +import scala.concurrent.util.FiniteDuration /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -191,32 +192,32 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { // start periodic gossip to random nodes in cluster val gossipTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], GossipInterval) { self ! GossipTick } // start periodic heartbeat to all nodes in cluster val heartbeatTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { self ! HeartbeatTick } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) val failureDetectorReaperTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) { self ! ReapUnreachableTick } // start periodic leader action management (only applies for the current leader) private val leaderActionsTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], LeaderActionsInterval) { self ! LeaderActionsTick } // start periodic publish of current state private val publishStateTask: Option[Cancellable] = if (PublishStatsInterval == Duration.Zero) None - else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) { + else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) { self ! PublishStatsTick }) diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala index 396f5127ad..9e6eedf659 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala @@ -9,14 +9,15 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } import akka.actor.{ Scheduler, Cancellable } import scala.concurrent.util.Duration import concurrent.ExecutionContext +import scala.concurrent.util.FiniteDuration /** * INTERNAL API */ private[akka] object FixedRateTask { def apply(scheduler: Scheduler, - initalDelay: Duration, - delay: Duration)(f: ⇒ Unit)(implicit executor: ExecutionContext): FixedRateTask = + initalDelay: FiniteDuration, + delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): FixedRateTask = new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f }) } @@ -28,8 +29,8 @@ private[akka] object FixedRateTask { * initialDelay. */ private[akka] class FixedRateTask(scheduler: Scheduler, - initalDelay: Duration, - delay: Duration, + initalDelay: FiniteDuration, + delay: FiniteDuration, task: Runnable)(implicit executor: ExecutionContext) extends Runnable with Cancellable { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index c92ff0eafb..ea35249303 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -19,6 +19,7 @@ import akka.remote.testconductor.RoleName import akka.actor.Props import akka.actor.Actor import akka.cluster.MemberStatus._ +import scala.concurrent.util.FiniteDuration object LargeClusterMultiJvmSpec extends MultiNodeConfig { // each jvm simulates a datacenter with many nodes @@ -122,8 +123,9 @@ abstract class LargeClusterSpec systems foreach { Cluster(_) } } - def expectedMaxDuration(totalNodes: Int): Duration = - 5.seconds + (2.seconds * totalNodes) + def expectedMaxDuration(totalNodes: Int): FiniteDuration = + // this cast will always succeed, but the compiler does not know about it ... + (5.seconds + (2.seconds * totalNodes)).asInstanceOf[FiniteDuration] def joinAll(from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { val joiningClusters = systems.map(Cluster(_)).toSet @@ -271,7 +273,7 @@ abstract class LargeClusterSpec val unreachableNodes = nodesPerDatacenter val liveNodes = nodesPerDatacenter * 4 - within(30.seconds + (3.seconds * liveNodes)) { + within((30.seconds + (3.seconds * liveNodes)).asInstanceOf[FiniteDuration]) { val startGossipCounts = Map.empty[Cluster, Long] ++ systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).readView.latestStats.receivedGossipCount)) def gossipCount(c: Cluster): Long = { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 47475fa80f..489c5415ea 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -4,7 +4,6 @@ package akka.cluster import language.implicitConversions - import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.actor.{ Address, ExtendedActorSystem } @@ -18,6 +17,7 @@ import org.scalatest.exceptions.TestFailedException import java.util.concurrent.ConcurrentHashMap import akka.actor.ActorPath import akka.actor.RootActorPath +import scala.concurrent.util.FiniteDuration object MultiNodeClusterSpec { @@ -174,7 +174,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS def awaitUpConvergence( numberOfMembers: Int, canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], - timeout: Duration = 20.seconds): Unit = { + timeout: FiniteDuration = 20.seconds): Unit = { within(timeout) { awaitCond(clusterView.members.size == numberOfMembers) awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) diff --git a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java index 75f102192e..70d3c35142 100644 --- a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -139,7 +139,7 @@ public class FaultHandlingDocSample { if (msg.equals(Start) && progressListener == null) { progressListener = getSender(); getContext().system().scheduler().schedule( - Duration.Zero(), Duration.parse("1 second"), getSelf(), Do, getContext().dispatcher() + Duration.Zero(), Duration.create(1, "second"), getSelf(), Do, getContext().dispatcher() ); } else if (msg.equals(Do)) { counterService.tell(new Increment(1), getSelf()); @@ -299,7 +299,7 @@ public class FaultHandlingDocSample { counter.tell(new UseStorage(null), getSelf()); // Try to re-establish storage after while getContext().system().scheduler().scheduleOnce( - Duration.parse("10 seconds"), getSelf(), Reconnect, getContext().dispatcher() + Duration.create(10, "seconds"), getSelf(), Reconnect, getContext().dispatcher() ); } else if (msg.equals(Reconnect)) { // Re-establish storage after the scheduled delay diff --git a/akka-docs/java/code/docs/future/FutureDocTestBase.java b/akka-docs/java/code/docs/future/FutureDocTestBase.java index 5cc84f9935..ca23065661 100644 --- a/akka-docs/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/docs/future/FutureDocTestBase.java @@ -518,7 +518,7 @@ public class FutureDocTestBase { //#after final ExecutionContext ec = system.dispatcher(); Future failExc = Futures.failed(new IllegalStateException("OHNOES1")); - Future delayed = Patterns.after(Duration.parse("500 millis"), + Future delayed = Patterns.after(Duration.create(500, "millis"), system.scheduler(), ec, failExc); Future future = future(new Callable() { public String call() throws InterruptedException { diff --git a/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java b/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java index f7b246bc57..c392cf131f 100644 --- a/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java +++ b/akka-docs/java/code/docs/zeromq/ZeromqDocTestBase.java @@ -187,7 +187,7 @@ public class ZeromqDocTestBase { @Override public void preStart() { getContext().system().scheduler() - .schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK, getContext().dispatcher()); + .schedule(Duration.create(1, "second"), Duration.create(1, "second"), getSelf(), TICK, getContext().dispatcher()); } @Override diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index 679bb35382..04284b3f12 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -418,3 +418,27 @@ Search Replace with ``akka.actor.mailbox.FileBasedMessageQueue`` ``akka.actor.mailbox.filebased.FileBasedMessageQueue`` ``akka.actor.mailbox.filequeue.*`` ``akka.actor.mailbox.filebased.filequeue.*`` ================================================ ========================================================= + +Actor Receive Timeout +===================== + +The API for setting and querying the receive timeout has been made more +consisten in always taking and returning a ``Duration``, the wrapping in +``Option`` has been removed. + +(Samples for Java, Scala sources are affected in exactly the same way.) + +v2.0:: + + getContext().setReceiveTimeout(Duration.create(10, SECONDS)); + final Option timeout = getContext().receiveTimeout(); + final isSet = timeout.isDefined(); + resetReceiveTimeout(); + +v2.1:: + + getContext().setReceiveTimeout(Duration.create(10, SECONDS)); + final Duration timeout = getContext().receiveTimeout(); + final isSet = timeout.isFinite(); + getContext().setReceiveTimeout(Duration.Undefined()); + diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 3822647ce8..84b2f17173 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.util.{ Timeout } import scala.concurrent.util.{ Deadline, Duration } import scala.reflect.classTag +import scala.concurrent.util.FiniteDuration sealed trait Direction { def includes(other: Direction): Boolean @@ -559,7 +560,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor } onTransition { - case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, nextStateData.deadline.timeLeft, false) + case Idle -> Waiting ⇒ setTimer("Timeout", StateTimeout, nextStateData.deadline.timeLeft.asInstanceOf[FiniteDuration], false) case Waiting -> Idle ⇒ cancelTimer("Timeout") } @@ -570,7 +571,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor val enterDeadline = getDeadline(timeout) // we only allow the deadlines to get shorter if (enterDeadline.timeLeft < deadline.timeLeft) { - setTimer("Timeout", StateTimeout, enterDeadline.timeLeft, false) + setTimer("Timeout", StateTimeout, enterDeadline.timeLeft.asInstanceOf[FiniteDuration], false) handleBarrier(d.copy(arrived = together, deadline = enterDeadline)) } else handleBarrier(d.copy(arrived = together)) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala index 6c8352d880..e1d5fb0854 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala @@ -4,20 +4,17 @@ package akka.remote.testconductor import language.postfixOps - import java.net.InetSocketAddress - import scala.annotation.tailrec import scala.collection.immutable.Queue - import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{ SimpleChannelHandler, MessageEvent, Channels, ChannelStateEvent, ChannelHandlerContext, ChannelFutureListener, ChannelFuture } - import akka.actor.{ Props, LoggingFSM, Address, ActorSystem, ActorRef, ActorLogging, Actor, FSM } import akka.event.Logging import akka.remote.netty.ChannelAddress import scala.concurrent.util.Duration import scala.concurrent.util.duration._ +import scala.concurrent.util.FiniteDuration /** * INTERNAL API. @@ -331,20 +328,20 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) * lead to the correct rate on average, with increased latency of the order of * HWT granularity. */ - private def schedule(d: Data): (Data, Seq[Send], Option[Duration]) = { + private def schedule(d: Data): (Data, Seq[Send], Option[FiniteDuration]) = { val now = System.nanoTime - @tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[Duration]) = { + @tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[FiniteDuration]) = { if (d.queue.isEmpty) (d, toSend, None) else { val timeForPacket = d.lastSent + (1000 * size(d.queue.head.msg) / d.rateMBit).toLong if (timeForPacket <= now) rec(Data(timeForPacket, d.rateMBit, d.queue.tail), toSend :+ d.queue.head) else { val splitThreshold = d.lastSent + packetSplitThreshold.toNanos - if (now < splitThreshold) (d, toSend, Some((timeForPacket - now).nanos min (splitThreshold - now).nanos)) + if (now < splitThreshold) (d, toSend, Some(((timeForPacket - now).nanos min (splitThreshold - now).nanos).asInstanceOf[FiniteDuration])) else { val microsToSend = (now - d.lastSent) / 1000 val (s1, s2) = split(d.queue.head, (microsToSend * d.rateMBit / 8).toInt) - (d.copy(queue = s2 +: d.queue.tail), toSend :+ s1, Some((timeForPacket - now).nanos min packetSplitThreshold)) + (d.copy(queue = s2 +: d.queue.tail), toSend :+ s1, Some(((timeForPacket - now).nanos min packetSplitThreshold).asInstanceOf[FiniteDuration])) } } } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 2b18bdbabb..03b07486f0 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -278,7 +278,7 @@ private[akka] class PlayerHandler( event.getCause match { case c: ConnectException if reconnects > 0 ⇒ reconnects -= 1 - scheduler.scheduleOnce(nextAttempt.timeLeft)(reconnect()) + scheduler.scheduleOnce(nextAttempt.timeLeft.asInstanceOf[FiniteDuration])(reconnect()) case e ⇒ fsm ! ConnectionFailure(e.getMessage) } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala index fe1b31ce45..64dc611396 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala @@ -7,6 +7,7 @@ import akka.actor.ActorRefFactory import scala.reflect.ClassTag import scala.concurrent.util.Duration import akka.actor.{ FSM, Actor, ActorRef } +import scala.concurrent.util.FiniteDuration /* * generic typed object buncher. @@ -35,7 +36,7 @@ object GenericBuncher { } } -abstract class GenericBuncher[A: ClassTag, B](val singleTimeout: Duration, val multiTimeout: Duration) +abstract class GenericBuncher[A: ClassTag, B](val singleTimeout: FiniteDuration, val multiTimeout: FiniteDuration) extends Actor with FSM[GenericBuncher.State, B] { import GenericBuncher._ import FSM._ @@ -85,7 +86,7 @@ object Buncher { val Flush = GenericBuncher.Flush } -class Buncher[A: ClassTag](singleTimeout: Duration, multiTimeout: Duration) +class Buncher[A: ClassTag](singleTimeout: FiniteDuration, multiTimeout: FiniteDuration) extends GenericBuncher[A, List[A]](singleTimeout, multiTimeout) { import Buncher._ diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index e97c76e6af..902eb797d2 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -4,11 +4,11 @@ package sample.fsm.dining.fsm import language.postfixOps - import akka.actor._ import akka.actor.FSM._ import scala.concurrent.util.Duration import scala.concurrent.util.duration._ +import scala.concurrent.util.FiniteDuration /* * Some messages for the chopstick @@ -159,7 +159,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // Initialize the hakker initialize - private def startThinking(duration: Duration): State = { + private def startThinking(duration: FiniteDuration): State = { goto(Thinking) using TakenChopsticks(None, None) forMax duration } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index bab4601587..90f3b2f545 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -7,6 +7,7 @@ package akka.testkit import akka.actor._ import scala.concurrent.util.Duration import akka.dispatch.DispatcherPrerequisites +import scala.concurrent.util.FiniteDuration /** * This is a specialised form of the TestActorRef with support for querying and @@ -59,14 +60,14 @@ class TestFSMRef[S, D, T <: Actor]( * corresponding transition initiated from within the FSM, including timeout * and stop handling. */ - def setState(stateName: S = fsm.stateName, stateData: D = fsm.stateData, timeout: Duration = null, stopReason: Option[FSM.Reason] = None) { + def setState(stateName: S = fsm.stateName, stateData: D = fsm.stateData, timeout: FiniteDuration = null, stopReason: Option[FSM.Reason] = None) { fsm.applyState(FSM.State(stateName, stateData, Option(timeout), stopReason)) } /** * Proxy for FSM.setTimer. */ - def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean) { + def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean) { fsm.setTimer(name, msg, timeout, repeat) } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index f033df302f..2e835b408f 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -4,7 +4,6 @@ package akka.testkit import language.{ postfixOps, reflectiveCalls } - import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ @@ -14,6 +13,7 @@ import scala.concurrent.util.duration._ import akka.actor.ActorSystem import akka.pattern.ask import akka.dispatch.Dispatcher +import scala.concurrent.util.Duration /** * Test whether TestActorRef behaves as an ActorRef should, besides its own spec. @@ -244,7 +244,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA "set receiveTimeout to None" in { val a = TestActorRef[WorkerActor] - a.underlyingActor.context.receiveTimeout must be(None) + a.underlyingActor.context.receiveTimeout must be theSameInstanceAs Duration.Undefined } "set CallingThreadDispatcher" in { diff --git a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala index 1d393afef0..ea7fb82d07 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala @@ -9,6 +9,7 @@ import akka.actor.ActorRef import scala.concurrent.util.duration._ import scala.concurrent.util.Duration import org.zeromq.ZMQ.{ Poller, Socket } +import scala.concurrent.util.FiniteDuration /** * Marker trait representing request messages for zeromq @@ -152,7 +153,7 @@ case class PollDispatcher(name: String) extends SocketMeta * An option containing the duration a poll cycle should wait for a message before it loops * @param duration */ -case class PollTimeoutDuration(duration: Duration = 100 millis) extends SocketMeta +case class PollTimeoutDuration(duration: FiniteDuration = 100 millis) extends SocketMeta /** * Start listening with this server socket on the specified address