Moved Timeout classes from akka.actor._ to akka.util._.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-12-14 17:30:54 +01:00
parent 0af92f2440
commit 04cd2adee9
30 changed files with 89 additions and 73 deletions

BIN
_mb/mailbox_user__b Normal file

Binary file not shown.

BIN
_mb/mailbox_user_myactor Normal file

Binary file not shown.

View file

@ -1,6 +1,6 @@
package akka.dispatch;
import akka.actor.Timeout;
import akka.util.Timeout;
import akka.actor.ActorSystem;
import akka.japi.*;
@ -21,7 +21,7 @@ public class JavaFutureTests {
private static ActorSystem system;
private static Timeout t;
private final Duration timeout = Duration.create(5, TimeUnit.SECONDS);
@BeforeClass

View file

@ -8,6 +8,7 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.util.Timeout
import akka.util.duration._
import java.lang.IllegalStateException
import akka.util.ReflectiveAccess

View file

@ -9,6 +9,7 @@ import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import java.util.concurrent.TimeoutException
import akka.dispatch.Await
import akka.util.Timeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {

View file

@ -6,6 +6,7 @@ package akka.actor
import akka.testkit._
import akka.util.duration._
import akka.util.Timeout
import akka.dispatch.{ Await, Future }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])

View file

@ -6,6 +6,7 @@ package akka.actor
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
import akka.serialization.Serialization
import java.util.concurrent.atomic.AtomicReference

View file

@ -6,6 +6,7 @@ package akka.actor.dispatch
import org.scalatest.Assertions._
import akka.testkit._
import akka.dispatch._
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import akka.util.Switch
@ -493,4 +494,4 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}
}
}
}

View file

@ -2,7 +2,7 @@ package akka.dispatch
import Future.flow
import akka.util.cps._
import akka.actor.Timeout
import akka.util.Timeout
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout

View file

@ -115,32 +115,6 @@ object Status {
case class Failure(cause: Throwable) extends Status
}
case class Timeout(duration: Duration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
object Timeout {
/**
* A timeout with zero duration, will cause most requests to always timeout.
*/
val zero = new Timeout(Duration.Zero)
/**
* A Timeout with infinite duration. Will never timeout. Use extreme caution with this
* as it may cause memory leaks, blocked threads, or may not even be supported by
* the receiver, which would result in an exception.
*/
val never = new Timeout(Duration.Inf)
def apply(timeout: Long) = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
}
trait ActorLogging { this: Actor
val log = akka.event.Logging(context.system.eventStream, context.self)
}

View file

@ -8,10 +8,12 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
import scala.annotation.tailrec
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
import akka.actor.Timeout.intToTimeout
import akka.util.Timeout
import akka.util.Timeout.intToTimeout
import akka.config.ConfigurationException
import akka.dispatch._
import akka.routing._
import akka.util.Timeout
import akka.AkkaException
import com.eaio.uuid.UUID
import akka.util.{ Duration, Switch, Helpers }

View file

@ -8,6 +8,8 @@ import akka.actor._
import akka.event._
import akka.dispatch._
import akka.util.duration._
import akka.util.Timeout
import akka.util.Timeout._
import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.NANOSECONDS
@ -464,8 +466,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
}
/*
* This is called after the last actor has signaled its termination, i.e.
* after the last dispatcher has had its chance to schedule its shutdown
* This is called after the last actor has signaled its termination, i.e.
* after the last dispatcher has had its chance to schedule its shutdown
* action.
*/
protected def stopScheduler(): Unit = scheduler match {

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.japi.{ Creator, Option JOption }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import akka.util.{ Duration, Timeout }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.{ Serializer, Serialization }
import akka.dispatch._
@ -481,4 +481,4 @@ class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory wit
}
}
else null
}
}

View file

@ -7,7 +7,7 @@ package akka.dispatch
import akka.AkkaException
import akka.event.Logging.Error
import akka.actor.Timeout
import akka.util.Timeout
import scala.Option
import akka.japi.{ Procedure, Function JFunc, Option JOption }

View file

@ -7,7 +7,7 @@ package akka.dispatch
import java.util.concurrent.atomic.AtomicReference
import scala.util.continuations._
import scala.annotation.{ tailrec }
import akka.actor.Timeout
import akka.util.Timeout
object PromiseStream {
def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): PromiseStream[A] = new PromiseStream[A]

View file

@ -3,7 +3,7 @@
*/
package akka.dispatch.japi
import akka.actor.Timeout
import akka.util.Timeout
import akka.japi.{ Procedure2, Procedure, Function JFunc, Option JOption }
/* Java API */

View file

@ -10,7 +10,7 @@ import akka.util.ReflectiveAccess
import akka.config.ConfigurationException
import akka.util.ReentrantGuard
import akka.util.duration._
import akka.actor.Timeout
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
import scala.util.control.NoStackTrace

View file

@ -6,6 +6,7 @@ package akka.routing
import akka.AkkaException
import akka.actor._
import akka.util.Timeout
import akka.config.ConfigurationException
import akka.dispatch.{ Future, MessageDispatcher }
import akka.util.{ ReflectiveAccess, Duration }

View file

@ -545,3 +545,29 @@ class DurationDouble(d: Double) {
def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, DAYS))
}
case class Timeout(duration: Duration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
object Timeout {
/**
* A timeout with zero duration, will cause most requests to always timeout.
*/
val zero = new Timeout(Duration.Zero)
/**
* A Timeout with infinite duration. Will never timeout. Use extreme caution with this
* as it may cause memory leaks, blocked threads, or may not even be supported by
* the receiver, which would result in an exception.
*/
val never = new Timeout(Duration.Inf)
def apply(timeout: Long) = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
}

View file

@ -2,7 +2,7 @@ package akka.util
import scala.util.continuations._
import akka.dispatch.MessageDispatcher
import akka.actor.Timeout
import akka.util.Timeout
package object cps {
def matchC[A, B, C, D](in: A)(pf: PartialFunction[A, B @cpsParam[C, D]]): B @cpsParam[C, D] = pf(in)

View file

@ -3,7 +3,7 @@ package akka.camel;
import akka.actor.Actor;
import akka.actor.TypedActor;
import akka.actor.Props;
import akka.actor.Timeout;
import akka.util.Timeout;
import akka.dispatch.Dispatchers;
import akka.japi.SideEffect;
import akka.util.FiniteDuration;

View file

@ -73,7 +73,7 @@ a top level actor, that is supervised by the system (internal guardian actor).
.. includecode:: code/ActorDocSpec.scala#context-actorOf
Actors are automatically started asynchronously when created.
When you create the ``Actor`` then it will automatically call the ``preStart``
When you create the ``Actor`` then it will automatically call the ``preStart``
callback method on the ``Actor`` trait. This is an excellent place to
add initialization code for the actor.
@ -98,7 +98,7 @@ Here is an example:
Creating Actors with Props
--------------------------
``Props`` is a configuration object to specify additional things for the actor to
``Props`` is a configuration object to specify additional things for the actor to
be created, such as the ``MessageDispatcher``.
.. includecode:: code/ActorDocSpec.scala#creating-props
@ -128,7 +128,7 @@ Actor API
The :class:`Actor` trait defines only one abstract method, the above mentioned
:meth:`receive`, which implements the behavior of the actor.
If the current actor behavior does not match a received message, :meth:`unhandled`
If the current actor behavior does not match a received message, :meth:`unhandled`
is called, which by default throws an :class:`UnhandledMessageException`.
In addition, it offers:
@ -145,7 +145,7 @@ In addition, it offers:
You can import the members in the :obj:`context` to avoid prefixing access with ``context.``
.. includecode:: code/ActorDocSpec.scala#import-context
.. includecode:: code/ActorDocSpec.scala#import-context
The remaining visible methods are user-overridable life-cycle hooks which are
described in the following::
@ -195,7 +195,7 @@ processing a message. This restart involves the hooks mentioned above:
An actor restart replaces only the actual actor object; the contents of the
mailbox and the hotswap stack are unaffected by the restart, so processing of
messages will resume after the :meth:`postRestart` hook returns. The message
messages will resume after the :meth:`postRestart` hook returns. The message
that triggered the exception will not be received again. Any message
sent to an actor while it is being restarted will be queued to its mailbox as
usual.
@ -205,9 +205,9 @@ Stop Hook
After stopping an actor, its :meth:`postStop` hook is called, which may be used
e.g. for deregistering this actor from other services. This hook is guaranteed
to run after message queuing has been disabled for this actor, i.e. messages
sent to a stopped actor will be redirected to the :obj:`deadLetters` of the
:obj:`ActorSystem`.
to run after message queuing has been disabled for this actor, i.e. messages
sent to a stopped actor will be redirected to the :obj:`deadLetters` of the
:obj:`ActorSystem`.
Identifying Actors
@ -267,7 +267,7 @@ implicitly passed along with the message and available to the receiving Actor
in its ``sender: ActorRef`` member field. The target actor can use this
to reply to the original sender, by using ``sender ! replyMsg``.
If invoked from an instance that is **not** an Actor the sender will be
If invoked from an instance that is **not** an Actor the sender will be
:obj:`deadLetters` actor reference by default.
Ask: Send-And-Receive-Future
@ -281,11 +281,11 @@ will immediately return a :class:`Future`:
val future = actor ? "hello"
The receiving actor should reply to this message, which will complete the
future with the reply message as value; ``sender ! result``.
future with the reply message as value; ``sender ! result``.
To complete the future with an exception you need send a Failure message to the sender.
This is not done automatically when an actor throws an exception while processing a
message.
To complete the future with an exception you need send a Failure message to the sender.
This is not done automatically when an actor throws an exception while processing a
message.
.. includecode:: code/ActorDocSpec.scala#reply-exception
@ -293,11 +293,11 @@ If the actor does not complete the future, it will expire after the timeout peri
which is taken from one of the following locations in order of precedence:
#. explicitly given timeout as in ``actor.?("hello")(timeout = 12 millis)``
#. implicit argument of type :class:`akka.actor.Timeout`, e.g.
#. implicit argument of type :class:`akka.util.Timeout`, e.g.
::
import akka.actor.Timeout
import akka.util.Timeout
import akka.util.duration._
implicit val timeout = Timeout(12 millis)
@ -306,8 +306,8 @@ which is taken from one of the following locations in order of precedence:
See :ref:`futures-scala` for more information on how to await or query a
future.
The ``onComplete``, ``onResult``, or ``onTimeout`` methods of the ``Future`` can be
used to register a callback to get a notification when the Future completes.
The ``onComplete``, ``onResult``, or ``onTimeout`` methods of the ``Future`` can be
used to register a callback to get a notification when the Future completes.
Gives you a way to avoid blocking.
.. warning::
@ -404,17 +404,17 @@ object.
Stopping actors
===============
Actors are stopped by invoking the ``stop`` method of the ``ActorRef``.
Actors are stopped by invoking the ``stop`` method of the ``ActorRef``.
The actual termination of the actor is performed asynchronously, i.e.
``stop`` may return before the actor is stopped.
``stop`` may return before the actor is stopped.
.. code-block:: scala
actor.stop()
Processing of the current message, if any, will continue before the actor is stopped,
Processing of the current message, if any, will continue before the actor is stopped,
but additional messages in the mailbox will not be processed. By default these
messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that
messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that
depends on the mailbox implementation.
When stop is called then a call to the ``def postStop`` callback method will
@ -541,11 +541,11 @@ messages on that mailbox, will be there as well.
What happens to the actor
-------------------------
If an exception is thrown, the actor instance is discarded and a new instance is
If an exception is thrown, the actor instance is discarded and a new instance is
created. This new instance will now be used in the actor references to this actor
(so this is done invisible to the developer). Note that this means that current
state of the failing actor instance is lost if you don't store and restore it in
``preRestart`` and ``postRestart`` callbacks.
(so this is done invisible to the developer). Note that this means that current
state of the failing actor instance is lost if you don't store and restore it in
``preRestart`` and ``postRestart`` callbacks.
Extending Actors using PartialFunction chaining

View file

@ -198,7 +198,7 @@ Then there's a method that's called ``fold`` that takes a start-value, a sequenc
.. code-block:: scala
val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
val futureSum = Future.fold(0)(futures)(_ + _)
That's all it takes!
@ -244,7 +244,7 @@ In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` pr
Timeouts
--------
Waiting forever for a ``Future`` to be completed can be dangerous. It could cause your program to block indefinitly or produce a memory leak. ``Future`` has support for a timeout already builtin with a default of 5 seconds (taken from :ref:`configuration`). A timeout is an instance of ``akka.actor.Timeout`` which contains an ``akka.util.Duration``. A ``Duration`` can be finite, which needs a length and unit type, or infinite. An infinite ``Timeout`` can be dangerous since it will never actually expire.
Waiting forever for a ``Future`` to be completed can be dangerous. It could cause your program to block indefinitly or produce a memory leak. ``Future`` has support for a timeout already builtin with a default of 5 seconds (taken from :ref:`configuration`). A timeout is an instance of ``akka.util.Timeout`` which contains an ``akka.util.Duration``. A ``Duration`` can be finite, which needs a length and unit type, or infinite. An infinite ``Timeout`` can be dangerous since it will never actually expire.
A different ``Timeout`` can be supplied either explicitly or implicitly when a ``Future`` is created. An implicit ``Timeout`` has the benefit of being usable by a for-comprehension as well as being picked up by any methods looking for an implicit ``Timeout``, while an explicit ``Timeout`` can be used in a more controlled manner.
@ -262,7 +262,7 @@ Implicit ``Timeout`` example:
.. code-block:: scala
import akka.actor.Timeout
import akka.util.Timeout
import akka.util.duration._
implicit val longTimeout = Timeout(1 minute)

View file

@ -11,6 +11,7 @@ import akka.actor.Status._
import akka.routing._
import akka.dispatch._
import akka.util.duration._
import akka.util.Timeout
import akka.config.ConfigurationException
import akka.event.{ DeathWatch, Logging }
import akka.serialization.Compression.LZF

View file

@ -9,6 +9,7 @@ import akka.actor._
import akka.stm._
import akka.japi.{ Function JFunc, Procedure JProc }
import akka.dispatch._
import akka.util.Timeout
/**
* Used internally to send functions.

View file

@ -3,7 +3,7 @@ package akka.agent.test
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import akka.actor.Timeout
import akka.util.Timeout
import akka.agent.Agent
import akka.stm._
import akka.util.Duration

View file

@ -6,6 +6,7 @@ import akka.actor.ActorSystem
import akka.actor._
import akka.stm.{ Ref, TransactionFactory }
import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
import akka.dispatch.Await

View file

@ -5,6 +5,7 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor.ActorSystem
import akka.actor._
import akka.util.Timeout
import akka.stm._
import akka.util.duration._
import akka.testkit._

View file

@ -5,6 +5,7 @@ import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import akka.actor._
import akka.util.Timeout
import akka.stm._
import akka.util.duration._
import akka.testkit._

View file

@ -11,6 +11,7 @@ import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.japi.Creator;
import akka.routing.*;
import akka.util.Timeout;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
@ -107,7 +108,7 @@ public class Pi {
this.latch = latch;
Creator<Router> routerCreator = new Creator<Router>() {
public Router create() {
return new RoundRobinRouter(getContext().dispatcher(), new akka.actor.Timeout(-1));
return new RoundRobinRouter(getContext().dispatcher(), new Timeout(-1));
}
};
LinkedList<ActorRef> actors = new LinkedList<ActorRef>() {
@ -116,7 +117,7 @@ public class Pi {
}
};
// FIXME routers are intended to be used like this
RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true);
RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new Timeout(-1), true);
router = new RoutedActorRef(getContext().system(), props, (InternalActorRef) getSelf(), "pi");
}