diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala index 87e251e494..c7c7b55c72 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala @@ -21,7 +21,7 @@ class ActorTimeoutSpec } }).start() - val testTimeout = if (Actor.Timeout.default.duration < 400.millis) 500 millis else 100 millis + val testTimeout = if (Timeout.default.duration < 400.millis) 500 millis else 100 millis override def afterAll { echo.stop() } @@ -36,7 +36,7 @@ class ActorTimeoutSpec } "use implicitly supplied timeout" in { - implicit val timeout = Actor.Timeout(testTimeout) + implicit val timeout = Timeout(testTimeout) within(testTimeout - 100.millis, testTimeout + 300.millis) { val f = (echo ? "hallo").mapTo[String] intercept[FutureTimeoutException] { f.await } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 3544767453..5ffc36446e 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -55,7 +55,7 @@ class RoutingSpec extends WordSpec with MustMatchers { case Test3 ⇒ t2 }.start() - implicit val timeout = Actor.Timeout((5 seconds).dilated) + implicit val timeout = Timeout((5 seconds).dilated) val result = for { a ← (d ? (Test1)).as[Int] b ← (d ? (Test2)).as[Int] diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index ca07252423..4867a0db23 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -106,6 +106,36 @@ object Status { case class Failure(cause: Throwable) extends Status } +case class Timeout(duration: Duration) { + def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) + def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) +} +object Timeout { + /** + * The default timeout, based on the config setting 'akka.actor.timeout' + */ + implicit val default = new Timeout(Actor.TIMEOUT) + + /** + * A timeout with zero duration, will cause most requests to always timeout. + */ + val zero = new Timeout(Duration.Zero) + + /** + * A Timeout with infinite duration. Will never timeout. Use extreme caution with this + * as it may cause memory leaks, blocked threads, or may not even be supported by + * the receiver, which would result in an exception. + */ + val never = new Timeout(Duration.Inf) + + def apply(timeout: Long) = new Timeout(timeout) + def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) + + implicit def durationToTimeout(duration: Duration) = new Timeout(duration) + implicit def intToTimeout(timeout: Int) = new Timeout(timeout) + implicit def longToTimeout(timeout: Long) = new Timeout(timeout) +} + /** * Actor factory module with factory methods for creating various kinds of Actors. * @@ -140,41 +170,6 @@ object Actor extends ListenerManagement { override def initialValue = Stack[ActorRef]() } - case class Timeout(duration: Duration) { - def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) - def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) - } - object Timeout { - /** - * The default timeout, based on the config setting 'akka.actor.timeout' - */ - implicit val default = new Timeout(TIMEOUT) - - /** - * A timeout with zero duration, will cause most requests to always timeout. - */ - val zero = new Timeout(Duration.Zero) - - /** - * A Timeout with infinite duration. Will never timeout. Use extreme caution with this - * as it may cause memory leaks, blocked threads, or may not even be supported by - * the receiver, which would result in an exception. - */ - val never = new Timeout(Duration.Inf) - - /** - * Used to indicate that this timeout should not be used. - */ - val none = new Timeout(Duration.MinusInf) - - def apply(timeout: Long) = new Timeout(timeout) - def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) - - implicit def durationToTimeout(duration: Duration) = new Timeout(duration) - implicit def intToTimeout(timeout: Int) = new Timeout(timeout) - implicit def longToTimeout(timeout: Long) = new Timeout(timeout) - } - private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 2c15540842..7ee17ef223 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -13,7 +13,6 @@ import akka.serialization.{ Format, Serializer } import ReflectiveAccess._ import ClusterModule._ import DeploymentConfig.{ ReplicationScheme, Replication, Transient, WriteThrough, WriteBehind } -import akka.actor.Actor.Timeout import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -250,7 +249,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def ask(message: AnyRef, timeout: Long, sender: ActorRef): Future[AnyRef] = - ?(message, Actor.Timeout(timeout))(sender).asInstanceOf[Future[AnyRef]] + ?(message, Timeout(timeout))(sender).asInstanceOf[Future[AnyRef]] /** * Akka Java API.
@@ -1165,7 +1164,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR * * */ - def !(message: Any)(implicit channel: UntypedChannel = NullChannel): Unit = { + def !(message: Any)(implicit channel: UntypedChannel): Unit = { if (isRunning) postMessageToMailbox(message, channel) else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") @@ -1196,14 +1195,15 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR /** * Sends a message asynchronously, returning a future which may eventually hold the reply. */ - def ?(message: Any, timeout: Timeout = Timeout.none)(implicit channel: UntypedChannel = NullChannel, implicitTimeout: Actor.Timeout = Timeout.default): Future[Any] = { + def ?(message: Any)(implicit channel: UntypedChannel, timeout: Timeout): Future[Any] = { if (isRunning) { - val realTimeout = if (timeout eq Timeout.none) implicitTimeout else timeout - postMessageToMailboxAndCreateFutureResultWithTimeout(message, realTimeout, channel) + postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel) } else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") } + def ?(message: Any, timeout: Timeout)(implicit channel: UntypedChannel): Future[Any] = ?(message)(channel, timeout) + /** * Forwards the message and passes the original sender actor as the sender. * diff --git a/akka-actor/src/main/scala/akka/actor/Channel.scala b/akka-actor/src/main/scala/akka/actor/Channel.scala index 2896883f2b..5604a5f9d6 100644 --- a/akka-actor/src/main/scala/akka/actor/Channel.scala +++ b/akka-actor/src/main/scala/akka/actor/Channel.scala @@ -127,6 +127,9 @@ object UntypedChannel { case Some(actor) ⇒ actor case None ⇒ NullChannel } + + implicit final val default: UntypedChannel = NullChannel + } /** diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 0ba0e44d5c..ac226a8734 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -41,7 +41,7 @@ object TypedActor { case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ - implicit val timeout = Actor.Timeout(actor.timeout) + implicit val timeout = Timeout(actor.timeout) MethodCall(method, args) match { case m if m.isOneWay ⇒ actor ! m diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 68fa9daf3a..1d26fdea7d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -6,8 +6,7 @@ package akka.dispatch import akka.AkkaException import akka.event.EventHandler -import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler } -import akka.actor.Actor.Timeout +import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler, Timeout } import akka.util.{ Duration, BoxedType } import akka.japi.{ Procedure, Function ⇒ JFunc } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 29ea723767..3dd4041c62 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -90,7 +90,7 @@ trait MessageDispatcher { dispatch(invocation) } - private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Actor.Timeout): Future[T] = { + private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Timeout): Future[T] = { futures.getAndIncrement() try { val future = new DefaultPromise[T](timeout) diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index 0000a74503..58e9754a08 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -312,7 +312,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def shutdownLinkedActors: Unit = unsupported def supervisor: Option[ActorRef] = unsupported def homeAddress: Option[InetSocketAddress] = None - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long, channel: UntypedChannel) = unsupported + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Timeout, channel: UntypedChannel) = unsupported protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox_=(msg: AnyRef): AnyRef = unsupported protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 72ee614752..fe2f80e529 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -8,7 +8,7 @@ import akka.actor.Actor._ import akka.routing.{Routing, CyclicIterator} import Routing._ import akka.event.EventHandler -import akka.actor.{Channel, Actor, PoisonPill} +import akka.actor.{Channel, Actor, PoisonPill, Timeout} import akka.dispatch.Future import System.{currentTimeMillis => now} @@ -104,7 +104,7 @@ object Pi extends App { val start = now //send calculate message - master.?(Calculate, Actor.Timeout(60000)). + master.?(Calculate, Timeout(60000)). await.resultOrException match {//wait for the result, with a 60 seconds timeout case Some(pi) => EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))