diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 61a1e84f7e..84367f7ec0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -4,7 +4,6 @@ package akka.actor import language.postfixOps - import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.util.Timeout import scala.concurrent.{ Await, Future, Promise } @@ -21,6 +20,7 @@ import akka.serialization.JavaSerializer import akka.actor.TypedActor._ import java.lang.IllegalStateException import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } +import scala.concurrent.util.FiniteDuration object TypedActorSpec { @@ -203,10 +203,10 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) def newFooBar: Foo = newFooBar(Duration(2, "s")) - def newFooBar(d: Duration): Foo = + def newFooBar(d: FiniteDuration): Foo = TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d))) - def newFooBar(dispatcher: String, d: Duration): Foo = + def newFooBar(dispatcher: String, d: FiniteDuration): Foo = TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)).withDispatcher(dispatcher)) def newStacked(): Stacked = diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index a67f65d870..7104e2edb6 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -68,16 +68,6 @@ class AskSpec extends AkkaSpec { }.getMessage must be === expectedMsg } - "return broken promises on infinite timeout" in { - implicit val timeout = Timeout.never - val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } })) - val f = echo ? "foo" - val expectedMsg = "Timeouts to `ask` must be finite. Question not sent to [%s]" format echo - intercept[IllegalArgumentException] { - Await.result(f, remaining) - }.getMessage must be === expectedMsg - } - } } \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 0de9c9d1c8..561e3d54dd 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -4,7 +4,6 @@ package akka.routing import language.postfixOps - import akka.actor.Actor import akka.testkit._ import akka.actor.Props @@ -15,6 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.pattern.ask import scala.concurrent.util.Duration import java.util.concurrent.TimeoutException +import scala.concurrent.util.FiniteDuration object ResizerSpec { @@ -174,8 +174,8 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with val router = system.actorOf(Props(new Actor { def receive = { - case d: Duration ⇒ Thread.sleep(d.dilated.toMillis); sender ! "done" - case "echo" ⇒ sender ! "reply" + case d: FiniteDuration ⇒ Thread.sleep(d.dilated.toMillis); sender ! "done" + case "echo" ⇒ sender ! "reply" } }).withRouter(RoundRobinRouter(resizer = Some(resizer)))) @@ -190,7 +190,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with routees(router) must be(3) - def loop(loops: Int, d: Duration) = { + def loop(loops: Int, d: FiniteDuration) = { for (m ← 0 until loops) router ! d for (m ← 0 until loops) expectMsg(d * 3, "done") } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index aaa5432815..e10e5350bc 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -4,7 +4,6 @@ package akka.actor import language.existentials - import akka.japi.{ Creator, Option ⇒ JOption } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.Timeout @@ -20,6 +19,7 @@ import scala.reflect.ClassTag import akka.serialization.{ JavaSerializer, SerializationExtension } import java.io.ObjectStreamException import scala.util.{ Try, Success, Failure } +import scala.concurrent.util.FiniteDuration /** * A TypedActorFactory is something that can created TypedActor instances. @@ -421,7 +421,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi /** * INTERNAL USE ONLY */ - private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: Duration) { + private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) { @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match { case null ⇒ throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value") case some ⇒ toTypedActorInvocationHandler(some) diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala index 9857023e82..e01e87262d 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -169,7 +169,7 @@ trait Inbox { this: ActorDSL.type ⇒ * this method within an actor! */ def receive(timeout: FiniteDuration = defaultTimeout): Any = { - implicit val t = Timeout(timeout + extraTime) + implicit val t = Timeout((timeout + extraTime).asInstanceOf[FiniteDuration]) Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf) } @@ -186,7 +186,7 @@ trait Inbox { this: ActorDSL.type ⇒ * this method within an actor! */ def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = { - implicit val t = Timeout(timeout + extraTime) + implicit val t = Timeout((timeout + extraTime).asInstanceOf[FiniteDuration]) predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 345d133ea4..7e3c3ce14d 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -78,8 +78,7 @@ trait AskSupport { actorRef.tell(message) Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef)) case ref: InternalActorRef ⇒ - if (!timeout.duration.isFinite) Future.failed[Any](new IllegalArgumentException("Timeouts to `ask` must be finite. Question not sent to [%s]" format actorRef)) - else if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef)) + if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef)) else { val provider = ref.provider val a = PromiseActorRef(provider, timeout) diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index a9cb8bc0c5..df228f821d 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -10,7 +10,7 @@ import akka.util.Unsafe import scala.util.control.NoStackTrace import java.util.concurrent.{ Callable, CopyOnWriteArrayList } import scala.concurrent.{ ExecutionContext, Future, Promise, Await } -import scala.concurrent.util.{ Duration, Deadline } +import scala.concurrent.util.{ FiniteDuration, Deadline } import scala.concurrent.util.duration._ import scala.util.control.NonFatal import scala.util.Success @@ -38,8 +38,8 @@ object CircuitBreaker { * @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure * @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit */ - def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker = - new CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(syncExecutionContext) + def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = + new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(syncExecutionContext) /** * Callbacks run in caller's thread when using withSyncCircuitBreaker, and in same ExecutionContext as the passed @@ -52,8 +52,8 @@ object CircuitBreaker { * @param callTimeout [[scala.concurrent.util.Duration]] of time after which to consider a call a failure * @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit */ - def create(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration): CircuitBreaker = - apply(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) + def create(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = + apply(scheduler, maxFailures, callTimeout, resetTimeout) } /** @@ -76,9 +76,9 @@ object CircuitBreaker { * @param resetTimeout [[scala.concurrent.util.Duration]] of time after which to attempt to close the circuit * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners */ -class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { +class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { - def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: Duration, resetTimeout: Duration) = { + def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = { this(scheduler, maxFailures, callTimeout, resetTimeout)(executor) } @@ -409,7 +409,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @return Future containing result of protected call */ override def invoke[T](body: ⇒ Future[T]): Future[T] = - if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(Duration.Zero)).future + if (compareAndSet(true, false)) callThrough(body) else Promise.failed[T](new CircuitBreakerOpenException(0.seconds)).future /** * Reset breaker on successful call. @@ -453,7 +453,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @return Future containing result of protected call */ override def invoke[T](body: ⇒ Future[T]): Future[T] = - Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft)).future + Promise.failed[T](new CircuitBreakerOpenException(remainingTimeout().timeLeft.asInstanceOf[FiniteDuration])).future /** * Calculate remaining timeout to inform the caller in case a backoff algorithm is useful @@ -510,6 +510,6 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati * @param message Defaults to "Circuit Breaker is open; calls are failing fast" */ class CircuitBreakerOpenException( - val remainingDuration: Duration, + val remainingDuration: FiniteDuration, message: String = "Circuit Breaker is open; calls are failing fast") extends AkkaException(message) with NoStackTrace diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 3db168e4c4..37fcc532e6 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -10,6 +10,7 @@ import akka.dispatch.{ Unwatch, Watch } import scala.concurrent.Future import scala.concurrent.util.Duration import scala.util.Success +import scala.concurrent.util.FiniteDuration trait GracefulStopSupport { /** @@ -36,7 +37,7 @@ trait GracefulStopSupport { * If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ - def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { + def gracefulStop(target: ActorRef, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Boolean] = { if (target.isTerminated) Future successful true else system match { case e: ExtendedActorSystem ⇒ diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 8dda900e35..bd86cc4930 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -6,6 +6,7 @@ package akka.pattern import akka.actor.Scheduler import scala.concurrent.ExecutionContext import java.util.concurrent.Callable +import scala.concurrent.util.FiniteDuration object Patterns { import akka.actor.{ ActorRef, ActorSystem } @@ -103,7 +104,7 @@ object Patterns { * If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ - def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = + def gracefulStop(target: ActorRef, timeout: FiniteDuration, system: ActorSystem): Future[java.lang.Boolean] = scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] /** diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 1d18e7ed2e..95fe9d9db2 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -5,7 +5,6 @@ package akka.routing import language.implicitConversions import language.postfixOps - import akka.actor._ import scala.concurrent.util.Duration import scala.concurrent.util.duration._ @@ -19,6 +18,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom import akka.dispatch.Dispatchers import scala.annotation.tailrec import concurrent.ExecutionContext +import scala.concurrent.util.FiniteDuration /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -1098,13 +1098,13 @@ object ScatterGatherFirstCompletedRouter { /** * Creates a new ScatterGatherFirstCompletedRouter, routing to the specified routees, timing out after the specified Duration */ - def apply(routees: Iterable[ActorRef], within: Duration): ScatterGatherFirstCompletedRouter = + def apply(routees: Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter = new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within) /** * Java API to create router with the supplied 'routees' actors. */ - def create(routees: java.lang.Iterable[ActorRef], within: Duration): ScatterGatherFirstCompletedRouter = { + def create(routees: java.lang.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter = { import scala.collection.JavaConverters._ apply(routees.asScala, within) } @@ -1153,7 +1153,7 @@ object ScatterGatherFirstCompletedRouter { * using `actorFor` in [[akka.actor.ActorRefProvider]] */ @SerialVersionUID(1L) -case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration, +case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: FiniteDuration, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy) @@ -1166,7 +1166,7 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It * Constructor that sets nrOfInstances to be created. * Java API */ - def this(nr: Int, w: Duration) = this(nrOfInstances = nr, within = w) + def this(nr: Int, w: FiniteDuration) = this(nrOfInstances = nr, within = w) /** * Constructor that sets the routees to be used. @@ -1174,14 +1174,14 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(routeePaths: java.lang.Iterable[String], w: Duration) = + def this(routeePaths: java.lang.Iterable[String], w: FiniteDuration) = this(routees = iterableAsScalaIterable(routeePaths), within = w) /** * Constructor that sets the resizer to be used. * Java API */ - def this(resizer: Resizer, w: Duration) = this(resizer = Some(resizer), within = w) + def this(resizer: Resizer, w: FiniteDuration) = this(resizer = Some(resizer), within = w) /** * Java API for setting routerDispatcher @@ -1211,7 +1211,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def routees: Iterable[String] - def within: Duration + def within: FiniteDuration def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { diff --git a/akka-actor/src/main/scala/akka/util/Timeout.scala b/akka-actor/src/main/scala/akka/util/Timeout.scala index 7080775d9b..62faa56f3d 100644 --- a/akka-actor/src/main/scala/akka/util/Timeout.scala +++ b/akka-actor/src/main/scala/akka/util/Timeout.scala @@ -8,10 +8,10 @@ import language.implicitConversions import java.util.concurrent.TimeUnit import java.lang.{ Double ⇒ JDouble } -import scala.concurrent.util.Duration +import scala.concurrent.util.{ Duration, FiniteDuration } @SerialVersionUID(1L) -case class Timeout(duration: Duration) { +case class Timeout(duration: FiniteDuration) { def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) } @@ -26,17 +26,10 @@ object Timeout { */ val zero: Timeout = new Timeout(Duration.Zero) - /** - * A Timeout with infinite duration. Will never timeout. Use extreme caution with this - * as it may cause memory leaks, blocked threads, or may not even be supported by - * the receiver, which would result in an exception. - */ - val never: Timeout = new Timeout(Duration.Inf) - def apply(timeout: Long): Timeout = new Timeout(timeout) def apply(length: Long, unit: TimeUnit): Timeout = new Timeout(length, unit) - implicit def durationToTimeout(duration: Duration): Timeout = new Timeout(duration) + implicit def durationToTimeout(duration: FiniteDuration): Timeout = new Timeout(duration) implicit def intToTimeout(timeout: Int): Timeout = new Timeout(timeout) implicit def longToTimeout(timeout: Long): Timeout = new Timeout(timeout) } diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index 87fb5af568..5b786b2599 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -11,6 +11,7 @@ import akka.util.Timeout import scala.concurrent.stm._ import concurrent.{ ExecutionContext, Future, Promise, Await } import concurrent.util.Duration +import scala.concurrent.util.FiniteDuration /** * Used internally to send functions. @@ -240,7 +241,7 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem * Dispatch a function to update the internal state, and return a Future where that new state can be obtained * within the given timeout */ - def alter(f: JFunc[T, T], timeout: Duration): Future[T] = alter(x ⇒ f(x))(timeout) + def alter(f: JFunc[T, T], timeout: FiniteDuration): Future[T] = alter(x ⇒ f(x))(timeout) /** * Java API: @@ -259,7 +260,7 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem * or blocking operations. Dispatches using either `alterOff` or `alter` will * still be executed in order. */ - def alterOff(f: JFunc[T, T], timeout: Duration, ec: ExecutionContext): Unit = alterOff(x ⇒ f(x))(Timeout(timeout), ec) + def alterOff(f: JFunc[T, T], timeout: FiniteDuration, ec: ExecutionContext): Unit = alterOff(x ⇒ f(x))(Timeout(timeout), ec) /** * Java API: diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index 61bbae14f7..4af600215c 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -10,6 +10,7 @@ import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ import scala.concurrent.util.Duration import concurrent.{ ExecutionContext, Future } +import scala.concurrent.util.FiniteDuration /** * Activation trait that can be used to wait on activation or de-activation of Camel endpoints. @@ -27,7 +28,7 @@ trait Activation { * @param endpoint the endpoint to be activated * @param timeout the timeout for the Future */ - def activationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = + def activationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] = (activationTracker.ask(AwaitActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ case EndpointActivated(`endpoint`) ⇒ endpoint case EndpointFailedToActivate(`endpoint`, cause) ⇒ throw cause @@ -40,7 +41,7 @@ trait Activation { * @param endpoint the endpoint to be deactivated * @param timeout the timeout of the Future */ - def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: Duration, executor: ExecutionContext): Future[ActorRef] = + def deactivationFutureFor(endpoint: ActorRef)(implicit timeout: FiniteDuration, executor: ExecutionContext): Future[ActorRef] = (activationTracker.ask(AwaitDeActivation(endpoint))(Timeout(timeout))).map[ActorRef]({ case EndpointDeActivated(`endpoint`) ⇒ endpoint case EndpointFailedToDeActivate(`endpoint`, cause) ⇒ throw cause diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index 72daa89da0..89f7719e23 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -5,12 +5,12 @@ package akka.camel import language.postfixOps - import internal.component.DurationTypeConverter import org.apache.camel.model.{ RouteDefinition, ProcessorDefinition } import akka.actor._ import scala.concurrent.util.Duration import scala.concurrent.util.duration._ +import scala.concurrent.util.FiniteDuration /** * Mixed in by Actor implementations that consume message from Camel endpoints. @@ -41,7 +41,7 @@ trait ConsumerConfig { this: CamelSupport ⇒ /** * How long the actor should wait for activation before it fails. */ - def activationTimeout: Duration = camel.settings.activationTimeout + def activationTimeout: FiniteDuration = camel.settings.activationTimeout /** * When endpoint is out-capable (can produce responses) replyTimeout is the maximum time diff --git a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala index c69c2f55bf..37fec6e1d0 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/ConsumerRegistry.scala @@ -7,9 +7,7 @@ package akka.camel.internal import akka.camel._ import component.CamelPath import java.io.InputStream - import org.apache.camel.builder.RouteBuilder - import akka.actor._ import collection.mutable import org.apache.camel.model.RouteDefinition @@ -17,6 +15,7 @@ import org.apache.camel.CamelContext import scala.concurrent.util.Duration import concurrent.Await import akka.util.Timeout +import scala.concurrent.util.FiniteDuration /** * For internal use only. @@ -38,7 +37,7 @@ private[camel] trait ConsumerRegistry { this: Activation ⇒ * @param activationTimeout the timeout for activation * @return the actorRef to the consumer */ - private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: Duration) = { + private[camel] def registerConsumer(endpointUri: String, consumer: Consumer, activationTimeout: FiniteDuration) = { idempotentRegistry ! RegisterConsumer(endpointUri, consumer.self, consumer) Await.result(activationFutureFor(consumer.self)(activationTimeout, consumer.context.dispatcher), activationTimeout) } diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index df681ba98a..41aaacdf8c 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -8,8 +8,8 @@ import akka.event.Logging import akka.camel.{ CamelSettings, Camel } import scala.util.control.NonFatal import scala.concurrent.util.Duration - import org.apache.camel.{ ProducerTemplate, CamelContext } +import scala.concurrent.util.FiniteDuration /** * For internal use only. @@ -32,7 +32,7 @@ private[camel] class DefaultCamel(val system: ActorSystem) extends Camel { ctx.setName(system.name) ctx.setStreamCaching(true) ctx.addComponent("akka", new ActorComponent(this, system)) - ctx.getTypeConverterRegistry.addTypeConverter(classOf[Duration], classOf[String], DurationTypeConverter) + ctx.getTypeConverterRegistry.addTypeConverter(classOf[FiniteDuration], classOf[String], DurationTypeConverter) ctx } diff --git a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala index 159ec437c5..61c4cdb1fb 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/component/ActorComponent.scala @@ -5,15 +5,11 @@ package akka.camel.internal.component import language.postfixOps - import java.util.{ Map ⇒ JMap } - import org.apache.camel._ import org.apache.camel.impl.{ DefaultProducer, DefaultEndpoint, DefaultComponent } - import akka.actor._ import akka.pattern._ - import scala.reflect.BeanProperty import scala.concurrent.util.duration._ import scala.concurrent.util.Duration @@ -25,6 +21,7 @@ import akka.camel.internal.CamelExchangeAdapter import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage } import support.TypeConverterSupport import scala.util.{ Failure, Success, Try } +import scala.concurrent.util.FiniteDuration /** * For internal use only. @@ -98,7 +95,7 @@ private[camel] trait ActorEndpointConfig { def path: ActorEndpointPath def camel: Camel - @BeanProperty var replyTimeout: Duration = camel.settings.replyTimeout + @BeanProperty var replyTimeout: FiniteDuration = camel.settings.replyTimeout @BeanProperty var autoAck: Boolean = camel.settings.autoAck } diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java index b30f541bb1..e1e87f8903 100644 --- a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -4,20 +4,22 @@ package akka.camel; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.junit.AfterClass; +import org.junit.Test; + import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; -import org.junit.AfterClass; -import org.junit.Test; -import java.util.concurrent.TimeUnit; +import scala.concurrent.util.FiniteDuration; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import akka.testkit.AkkaSpec; -import akka.testkit.JavaTestKit.EventFilter; - -import static org.junit.Assert.assertEquals; +import akka.testkit.JavaTestKit; /** @@ -37,7 +39,7 @@ public class ConsumerJavaTestBase { new JavaTestKit(system) {{ String result = new EventFilter(Exception.class) { protected String run() { - Duration timeout = Duration.create(1, TimeUnit.SECONDS); + FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS); Camel camel = CamelExtension.get(system); ExecutionContext executionContext = system.dispatcher(); try { diff --git a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java index bb7bf4042f..5454dd0074 100644 --- a/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java +++ b/akka-camel/src/test/java/akka/camel/CustomRouteTestBase.java @@ -7,6 +7,7 @@ import akka.camel.javaapi.UntypedProducerActor; import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.util.Duration; +import scala.concurrent.util.FiniteDuration; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; import org.apache.camel.Predicate; @@ -59,7 +60,7 @@ public class CustomRouteTestBase { @Test public void testCustomConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockConsumer", MockEndpoint.class); - Duration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props(TestConsumer.class), "testConsumer"), timeout, executionContext), @@ -73,7 +74,7 @@ public class CustomRouteTestBase { @Test public void testCustomAckConsumerRoute() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAck", MockEndpoint.class); - Duration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor( @@ -91,7 +92,7 @@ public class CustomRouteTestBase { public void testCustomAckConsumerRouteFromUri() throws Exception { MockEndpoint mockEndpoint = camel.context().getEndpoint("mock:mockAckUri", MockEndpoint.class); ExecutionContext executionContext = system.dispatcher(); - Duration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerAckFromUri","mock:mockAckUri"); } }), "testConsumerAckUri"), timeout, executionContext), @@ -104,7 +105,7 @@ public class CustomRouteTestBase { @Test(expected=CamelExecutionException.class) public void testCustomTimeoutConsumerRoute() throws Exception { - Duration timeout = Duration.create(10, TimeUnit.SECONDS); + FiniteDuration timeout = Duration.create(10, TimeUnit.SECONDS); ExecutionContext executionContext = system.dispatcher(); ActorRef consumer = Await.result( camel.activationFutureFor(system.actorOf(new Props( new UntypedActorFactory(){ public Actor create() { return new TestAckConsumer("direct:testConsumerException","mock:mockException"); } }), "testConsumerException"), diff --git a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala index 8b251c4f9a..ddff6017ec 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/ActivationTrackerTest.scala @@ -1,7 +1,6 @@ package akka.camel.internal import language.postfixOps - import org.scalatest.matchers.MustMatchers import scala.concurrent.util.duration._ import org.scalatest.{ GivenWhenThen, BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } @@ -9,6 +8,7 @@ import akka.actor.{ Props, ActorSystem } import scala.concurrent.util.Duration import akka.camel._ import akka.testkit.{ TimingTest, TestProbe, TestKit } +import scala.concurrent.util.FiniteDuration class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach with GivenWhenThen { @@ -115,11 +115,11 @@ class ActivationTrackerTest extends TestKit(ActorSystem("test")) with WordSpec w val probe = TestProbe() def awaitActivation() = at.tell(AwaitActivation(actor.ref), probe.ref) def awaitDeActivation() = at.tell(AwaitDeActivation(actor.ref), probe.ref) - def verifyActivated(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointActivated(actor.ref)) } - def verifyDeActivated(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointDeActivated(actor.ref)) } + def verifyActivated(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointActivated(actor.ref)) } + def verifyDeActivated(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointDeActivated(actor.ref)) } - def verifyFailedToActivate(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToActivate(actor.ref, cause)) } - def verifyFailedToDeActivate(timeout: Duration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToDeActivate(actor.ref, cause)) } + def verifyFailedToActivate(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToActivate(actor.ref, cause)) } + def verifyFailedToDeActivate(timeout: FiniteDuration = 50 millis) = within(timeout) { probe.expectMsg(EndpointFailedToDeActivate(actor.ref, cause)) } } diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index d4e1ac0ad5..09f9431cc9 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -5,7 +5,6 @@ package akka.camel.internal.component import language.postfixOps - import org.scalatest.mock.MockitoSugar import org.mockito.Matchers.any import org.mockito.Mockito._ @@ -27,6 +26,7 @@ import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem.Settings import akka.event.LoggingAdapter import akka.testkit.{ TimingTest, TestKit, TestProbe } +import scala.concurrent.util.FiniteDuration class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with MustMatchers with ActorProducerFixture { @@ -303,7 +303,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo def msg(s: String) = CamelMessage(s, Map.empty) - def given(actor: ActorRef = probe.ref, outCapable: Boolean = true, autoAck: Boolean = true, replyTimeout: Duration = Int.MaxValue seconds) = { + def given(actor: ActorRef = probe.ref, outCapable: Boolean = true, autoAck: Boolean = true, replyTimeout: FiniteDuration = Int.MaxValue seconds) = { prepareMocks(actor, outCapable = outCapable) new ActorProducer(configure(isAutoAck = autoAck, _replyTimeout = replyTimeout), camel) } @@ -325,16 +325,16 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo callbackReceived.countDown() } - private[this] def valueWithin(implicit timeout: Duration) = + private[this] def valueWithin(implicit timeout: FiniteDuration) = if (!callbackReceived.await(timeout.toNanos, TimeUnit.NANOSECONDS)) fail("Callback not received!") else callbackValue.get - def expectDoneSyncWithin(implicit timeout: Duration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously") - def expectDoneAsyncWithin(implicit timeout: Duration): Unit = if (valueWithin(timeout)) fail("Expected to be done Asynchronously") + def expectDoneSyncWithin(implicit timeout: FiniteDuration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously") + def expectDoneAsyncWithin(implicit timeout: FiniteDuration): Unit = if (valueWithin(timeout)) fail("Expected to be done Asynchronously") } - def configure(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: Duration = Int.MaxValue seconds) = { + def configure(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: FiniteDuration = Int.MaxValue seconds) = { val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel) endpoint.autoAck = isAutoAck endpoint.replyTimeout = _replyTimeout diff --git a/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java index f8df40f30a..071affb831 100644 --- a/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -27,7 +27,7 @@ public class DangerousJavaActor extends UntypedActor { public DangerousJavaActor() { this.breaker = new CircuitBreaker( getContext().dispatcher(), getContext().system().scheduler(), - 5, Duration.parse("10s"), Duration.parse("1m")) + 5, Duration.create(10, "s"), Duration.create(1, "m")) .onOpen(new Callable() { public Object call() throws Exception { notifyMeOnOpen(); diff --git a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java index ff1e243aaf..75f102192e 100644 --- a/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -109,7 +109,7 @@ public class FaultHandlingDocSample { */ public static class Worker extends UntypedActor { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - final Timeout askTimeout = new Timeout(Duration.parse("5 seconds")); + final Timeout askTimeout = new Timeout(Duration.create(5, "seconds")); // The sender of the initial Start message will continuously be notified about progress ActorRef progressListener; diff --git a/akka-docs/java/code/docs/future/FutureDocTestBase.java b/akka-docs/java/code/docs/future/FutureDocTestBase.java index 9ee643c17e..5cc84f9935 100644 --- a/akka-docs/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/docs/future/FutureDocTestBase.java @@ -106,7 +106,7 @@ public class FutureDocTestBase { ActorRef actor = system.actorOf(new Props(MyActor.class)); String msg = "hello"; //#ask-blocking - Timeout timeout = new Timeout(Duration.parse("5 seconds")); + Timeout timeout = new Timeout(Duration.create(5, "seconds")); Future future = Patterns.ask(actor, msg, timeout); String result = (String) Await.result(future, timeout.duration()); //#ask-blocking diff --git a/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java b/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java index 42ebc6f89e..d47419ee60 100644 --- a/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java +++ b/akka-docs/java/code/docs/jrouting/CustomRouterDocTestBase.java @@ -72,7 +72,7 @@ public class CustomRouterDocTestBase { routedActor.tell(RepublicanVote); routedActor.tell(DemocratVote); routedActor.tell(RepublicanVote); - Timeout timeout = new Timeout(Duration.parse("1 seconds")); + Timeout timeout = new Timeout(Duration.create(1, "seconds")); Future democratsResult = ask(routedActor, DemocratCountResult, timeout); Future republicansResult = ask(routedActor, RepublicanCountResult, timeout); diff --git a/akka-docs/java/code/docs/jrouting/ParentActor.java b/akka-docs/java/code/docs/jrouting/ParentActor.java index c21aed2dc6..21ce628964 100644 --- a/akka-docs/java/code/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/docs/jrouting/ParentActor.java @@ -53,8 +53,8 @@ public class ParentActor extends UntypedActor { //#scatterGatherFirstCompletedRouter ActorRef scatterGatherFirstCompletedRouter = getContext().actorOf( new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration - .parse("2 seconds"))), "router"); - Timeout timeout = new Timeout(Duration.parse("5 seconds")); + .create(2, "seconds"))), "router"); + Timeout timeout = new Timeout(Duration.create(5, "seconds")); Future futureResult = akka.pattern.Patterns.ask(scatterGatherFirstCompletedRouter, new FibonacciActor.FibonacciNumber(10), timeout); int result = (Integer) Await.result(futureResult, timeout.duration()); diff --git a/akka-docs/java/code/docs/testkit/TestKitDocTest.java b/akka-docs/java/code/docs/testkit/TestKitDocTest.java index 0b8b5c7a4e..f0c2263c3e 100644 --- a/akka-docs/java/code/docs/testkit/TestKitDocTest.java +++ b/akka-docs/java/code/docs/testkit/TestKitDocTest.java @@ -98,7 +98,7 @@ public class TestKitDocTest { //#test-within new JavaTestKit(system) {{ getRef().tell(42); - new Within(Duration.Zero(), Duration.parse("1 second")) { + new Within(Duration.Zero(), Duration.create(1, "second")) { // do not put code outside this method, will run afterwards public void run() { assertEquals((Integer) 42, expectMsgClass(Integer.class)); diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index fd5629ea3b..2444e4c791 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -239,7 +239,7 @@ If the target actor of ``akka.pattern.gracefulStop`` isn't terminated within the timeout the ``Future`` is completed with failure ``akka.pattern.AskTimeoutException``. In 2.0 it was ``akka.actor.ActorTimeoutException``. -getInstance for singeltons - Java +getInstance for Singletons - Java ==================================== v2.0:: @@ -358,4 +358,30 @@ v2.1:: else if (requestedCapacity < 0) routeeProvider.removeRoutees( -requestedCapacity, stopDelay) +Duration and Timeout +==================== +The Duration class in the scala library is an improved version of the previous +:class:`akka.util.Duration`. Among others it keeps the static type of +:class:`FiniteDuration` more consistently, which has been used to tighten APIs. +The advantage is that instead of runtime exceptions you’ll get compiler errors +telling you if you try to pass a possibly non-finite duration where it does not +belong. + +The main source incompatibility is that you may have to change the declared +type of fields from ``Duration`` to ``FiniteDuration`` (factory methods already +return the more precise type wherever possible). + +Another change is that ``Duration.parse`` was not accepted by the scala-library +maintainers, use ``Duration.create`` instead. + +v2.0:: + + final Duration d = Duration.parse("1 second"); + final Timeout t = new Timeout(d); + +v2.1:: + + final FiniteDuration d = Duration.create("1 second"); + final Timeout t = new Timeout(d); // always required finite duration, now also in type + diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 2e22a0d819..2b18bdbabb 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -4,7 +4,6 @@ package akka.remote.testconductor import language.postfixOps - import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props, PoisonPill, Status, Address, Scheduler } import RemoteConnection.getAddrString import scala.concurrent.util.{ Duration, Deadline } @@ -20,6 +19,7 @@ import akka.event.{ LoggingAdapter, Logging } import java.net.{ InetSocketAddress, ConnectException } import scala.reflect.classTag import concurrent.{ ExecutionContext, Await, Future } +import scala.concurrent.util.FiniteDuration /** * The Player is the client component of the @@ -81,13 +81,13 @@ trait Player { this: TestConductorExt ⇒ system.log.debug("entering barriers " + name.mkString("(", ", ", ")")) val stop = Deadline.now + timeout.duration name foreach { b ⇒ - val barrierTimeout = stop.timeLeft + val barrierTimeout = stop.timeLeft.asInstanceOf[FiniteDuration] if (barrierTimeout < Duration.Zero) { client ! ToServer(FailBarrier(b)) throw new TimeoutException("Server timed out while waiting for barrier " + b); } try { - implicit val timeout = Timeout(barrierTimeout + Settings.QueryTimeout.duration) + implicit val timeout = Timeout((barrierTimeout + Settings.QueryTimeout.duration).asInstanceOf[FiniteDuration]) Await.result(client ? ToServer(EnterBarrier(b, Option(barrierTimeout))), Duration.Inf) } catch { case e: AskTimeoutException ⇒ diff --git a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java index e8ce90a9ed..88fe0d940e 100644 --- a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java +++ b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java @@ -11,6 +11,7 @@ import akka.event.Logging.LogEvent; import akka.japi.JavaPartialFunction; import akka.japi.Util; import scala.concurrent.util.Duration; +import scala.concurrent.util.FiniteDuration; /** * Java API for the TestProbe. Proper JavaDocs to come once JavaDoccing is implemented. @@ -30,8 +31,10 @@ public class JavaTestKit { return p.system(); } - static public Duration duration(String s) { - return Duration.parse(s); + static public FiniteDuration duration(String s) { + final Duration ret = Duration.apply(s); + if (ret instanceof FiniteDuration) return (FiniteDuration) ret; + else throw new IllegalArgumentException("duration() is only for finite durations, use Duration.Inf() and friends"); } public Duration dilated(Duration d) { @@ -58,11 +61,11 @@ public class JavaTestKit { p.lastMessage().sender().tell(msg, p.ref()); } - public Duration getRemainingTime() { + public FiniteDuration getRemainingTime() { return p.remaining(); } - public Duration getRemainingTimeOr(Duration def) { + public FiniteDuration getRemainingTimeOr(FiniteDuration def) { return p.remainingOr(def); } @@ -97,7 +100,7 @@ public class JavaTestKit { public abstract class Within { protected abstract void run(); - public Within(Duration max) { + public Within(FiniteDuration max) { p.within(max, new AbstractFunction0() { public Object apply() { run(); @@ -106,7 +109,7 @@ public class JavaTestKit { }); } - public Within(Duration min, Duration max) { + public Within(FiniteDuration min, FiniteDuration max) { p.within(min, max, new AbstractFunction0() { public Object apply() { run(); @@ -168,7 +171,7 @@ public class JavaTestKit { return p.expectMsg(msg); } - public T expectMsgEquals(Duration max, T msg) { + public T expectMsgEquals(FiniteDuration max, T msg) { return p.expectMsg(max, msg); } @@ -176,7 +179,7 @@ public class JavaTestKit { return p.expectMsgClass(clazz); } - public T expectMsgClass(Duration max, Class clazz) { + public T expectMsgClass(FiniteDuration max, Class clazz) { return p.expectMsgClass(max, clazz); } @@ -184,7 +187,7 @@ public class JavaTestKit { return p.expectMsgAnyOf(Util.arrayToSeq(msgs)); } - public Object expectMsgAnyOf(Duration max, Object... msgs) { + public Object expectMsgAnyOf(FiniteDuration max, Object... msgs) { return p.expectMsgAnyOf(max, Util.arrayToSeq(msgs)); } @@ -193,7 +196,7 @@ public class JavaTestKit { Util.classTag(Object.class)); } - public Object[] expectMsgAllOf(Duration max, Object... msgs) { + public Object[] expectMsgAllOf(FiniteDuration max, Object... msgs) { return (Object[]) p.expectMsgAllOf(max, Util.arrayToSeq(msgs)).toArray( Util.classTag(Object.class)); } @@ -204,7 +207,7 @@ public class JavaTestKit { return (T) result; } - public Object expectMsgAnyClassOf(Duration max, Class... classes) { + public Object expectMsgAnyClassOf(FiniteDuration max, Class... classes) { return p.expectMsgAnyClassOf(max, Util.arrayToSeq(classes)); } @@ -212,7 +215,7 @@ public class JavaTestKit { p.expectNoMsg(); } - public void expectNoMsg(Duration max) { + public void expectNoMsg(FiniteDuration max) { p.expectNoMsg(max); } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala index dd13c22309..929838a8b5 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala @@ -7,6 +7,7 @@ package akka.testkit import scala.concurrent.util.Duration import java.util.concurrent.{ CyclicBarrier, TimeUnit, TimeoutException } import akka.actor.ActorSystem +import scala.concurrent.util.FiniteDuration class TestBarrierTimeoutException(message: String) extends RuntimeException(message) @@ -27,7 +28,7 @@ class TestBarrier(count: Int) { def await()(implicit system: ActorSystem): Unit = await(TestBarrier.DefaultTimeout) - def await(timeout: Duration)(implicit system: ActorSystem) { + def await(timeout: FiniteDuration)(implicit system: ActorSystem) { try { barrier.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) } catch { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index a1721d2ffe..a187dfbeef 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -7,7 +7,7 @@ import language.postfixOps import akka.actor._ import akka.actor.Actor._ -import scala.concurrent.util.Duration +import scala.concurrent.util.{ Duration, FiniteDuration } import scala.concurrent.util.duration._ import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atomic } import atomic.AtomicInteger @@ -173,20 +173,30 @@ trait TestKitBase { /** * Obtain current time (`System.nanoTime`) as Duration. */ - def now: Duration = System.nanoTime.nanos + def now: FiniteDuration = System.nanoTime.nanos /** * Obtain time remaining for execution of the innermost enclosing `within` * block or missing that it returns the properly dilated default for this * case from settings (key "akka.test.single-expect-default"). */ - def remaining: Duration = remainingOr(testKitSettings.SingleExpectDefaultTimeout.dilated) + def remaining: FiniteDuration = remainingOr(testKitSettings.SingleExpectDefaultTimeout.dilated) /** * Obtain time remaining for execution of the innermost enclosing `within` * block or missing that it returns the given duration. */ - def remainingOr(duration: Duration): Duration = if (end == Duration.Undefined) duration else end - now + def remainingOr(duration: FiniteDuration): FiniteDuration = end match { + case x if x eq Duration.Undefined ⇒ duration + case x if !x.isFinite ⇒ throw new IllegalArgumentException("`end` cannot be infinite") + case f: FiniteDuration ⇒ (end - now).asInstanceOf[FiniteDuration] // RK FIXME after next Scala milestone + } + + private def remainingOrDilated(max: Duration): FiniteDuration = max match { + case x if x eq Duration.Undefined ⇒ remaining + case x if !x.isFinite ⇒ throw new IllegalArgumentException("max duration cannot be infinite") + case f: FiniteDuration ⇒ f.dilated + } /** * Query queue status. @@ -204,7 +214,7 @@ trait TestKitBase { * which uses the configuration entry "akka.test.timefactor". */ def awaitCond(p: ⇒ Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis) { - val _max = if (max eq Duration.Undefined) remaining else max.dilated + val _max = remainingOrDilated(max) val stop = now + _max @tailrec @@ -235,7 +245,7 @@ trait TestKitBase { * } * */ - def within[T](min: Duration, max: Duration)(f: ⇒ T): T = { + def within[T](min: FiniteDuration, max: FiniteDuration)(f: ⇒ T): T = { val _max = max.dilated val start = now val rem = if (end == Duration.Undefined) Duration.Inf else end - start @@ -261,7 +271,7 @@ trait TestKitBase { /** * Same as calling `within(0 seconds, max)(f)`. */ - def within[T](max: Duration)(f: ⇒ T): T = within(0 seconds, max)(f) + def within[T](max: FiniteDuration)(f: ⇒ T): T = within(0 seconds, max)(f) /** * Same as `expectMsg(remaining, obj)`, but correctly treating the timeFactor. @@ -275,7 +285,7 @@ trait TestKitBase { * * @return the received object */ - def expectMsg[T](max: Duration, obj: T): T = expectMsg_internal(max.dilated, obj) + def expectMsg[T](max: FiniteDuration, obj: T): T = expectMsg_internal(max.dilated, obj) private def expectMsg_internal[T](max: Duration, obj: T): T = { val o = receiveOne(max) @@ -295,7 +305,7 @@ trait TestKitBase { * @return the received object as transformed by the partial function */ def expectMsgPF[T](max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, T]): T = { - val _max = if (max eq Duration.Undefined) remaining else max.dilated + val _max = remainingOrDilated(max) val o = receiveOne(_max) assert(o ne null, "timeout (" + _max + ") during expectMsg: " + hint) assert(f.isDefinedAt(o), "expected: " + hint + " but got unexpected message " + o) @@ -311,7 +321,7 @@ trait TestKitBase { * partial function returned true */ def fishForMessage(max: Duration = Duration.Undefined, hint: String = "")(f: PartialFunction[Any, Boolean]): Any = { - val _max = if (max eq Duration.Undefined) remaining else max.dilated + val _max = remainingOrDilated(max) val end = now + _max @tailrec def recv: Any = { @@ -335,7 +345,7 @@ trait TestKitBase { * * @return the received object */ - def expectMsgType[T](max: Duration)(implicit t: ClassTag[T]): T = expectMsgClass_internal(max.dilated, t.runtimeClass.asInstanceOf[Class[T]]) + def expectMsgType[T](max: FiniteDuration)(implicit t: ClassTag[T]): T = expectMsgClass_internal(max.dilated, t.runtimeClass.asInstanceOf[Class[T]]) /** * Same as `expectMsgClass(remaining, c)`, but correctly treating the timeFactor. @@ -349,9 +359,9 @@ trait TestKitBase { * * @return the received object */ - def expectMsgClass[C](max: Duration, c: Class[C]): C = expectMsgClass_internal(max.dilated, c) + def expectMsgClass[C](max: FiniteDuration, c: Class[C]): C = expectMsgClass_internal(max.dilated, c) - private def expectMsgClass_internal[C](max: Duration, c: Class[C]): C = { + private def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = { val o = receiveOne(max) assert(o ne null, "timeout (" + max + ") during expectMsgClass waiting for " + c) assert(BoxedType(c) isInstance o, "expected " + c + ", found " + o.getClass) @@ -370,9 +380,9 @@ trait TestKitBase { * * @return the received object */ - def expectMsgAnyOf[T](max: Duration, obj: T*): T = expectMsgAnyOf_internal(max.dilated, obj: _*) + def expectMsgAnyOf[T](max: FiniteDuration, obj: T*): T = expectMsgAnyOf_internal(max.dilated, obj: _*) - private def expectMsgAnyOf_internal[T](max: Duration, obj: T*): T = { + private def expectMsgAnyOf_internal[T](max: FiniteDuration, obj: T*): T = { val o = receiveOne(max) assert(o ne null, "timeout (" + max + ") during expectMsgAnyOf waiting for " + obj.mkString("(", ", ", ")")) assert(obj exists (_ == o), "found unexpected " + o) @@ -391,9 +401,9 @@ trait TestKitBase { * * @return the received object */ - def expectMsgAnyClassOf[C](max: Duration, obj: Class[_ <: C]*): C = expectMsgAnyClassOf_internal(max.dilated, obj: _*) + def expectMsgAnyClassOf[C](max: FiniteDuration, obj: Class[_ <: C]*): C = expectMsgAnyClassOf_internal(max.dilated, obj: _*) - private def expectMsgAnyClassOf_internal[C](max: Duration, obj: Class[_ <: C]*): C = { + private def expectMsgAnyClassOf_internal[C](max: FiniteDuration, obj: Class[_ <: C]*): C = { val o = receiveOne(max) assert(o ne null, "timeout (" + max + ") during expectMsgAnyClassOf waiting for " + obj.mkString("(", ", ", ")")) assert(obj exists (c ⇒ BoxedType(c) isInstance o), "found unexpected " + o) @@ -418,9 +428,9 @@ trait TestKitBase { * expectMsgAllOf(1 second, Result1(), Result2()) * */ - def expectMsgAllOf[T](max: Duration, obj: T*): Seq[T] = expectMsgAllOf_internal(max.dilated, obj: _*) + def expectMsgAllOf[T](max: FiniteDuration, obj: T*): Seq[T] = expectMsgAllOf_internal(max.dilated, obj: _*) - private def expectMsgAllOf_internal[T](max: Duration, obj: T*): Seq[T] = { + private def expectMsgAllOf_internal[T](max: FiniteDuration, obj: T*): Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (x == _), "not found " + x)) recv foreach (x ⇒ assert(obj exists (x == _), "found unexpected " + x)) @@ -440,9 +450,9 @@ trait TestKitBase { * Wait time is bounded by the given duration, with an AssertionFailure * being thrown in case of timeout. */ - def expectMsgAllClassOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(max.dilated, obj: _*) + def expectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(max.dilated, obj: _*) - private def internalExpectMsgAllClassOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { + private def internalExpectMsgAllClassOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (_.getClass eq BoxedType(x)), "not found " + x)) recv foreach (x ⇒ assert(obj exists (c ⇒ BoxedType(c) eq x.getClass), "found non-matching object " + x)) @@ -465,9 +475,9 @@ trait TestKitBase { * Beware that one object may satisfy all given class constraints, which * may be counter-intuitive. */ - def expectMsgAllConformingOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(max.dilated, obj: _*) + def expectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(max.dilated, obj: _*) - private def internalExpectMsgAllConformingOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { + private def internalExpectMsgAllConformingOf[T](max: FiniteDuration, obj: Class[_ <: T]*): Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (BoxedType(x) isInstance _), "not found " + x)) recv foreach (x ⇒ assert(obj exists (c ⇒ BoxedType(c) isInstance x), "found non-matching object " + x)) @@ -482,9 +492,9 @@ trait TestKitBase { /** * Assert that no message is received for the specified time. */ - def expectNoMsg(max: Duration) { expectNoMsg_internal(max.dilated) } + def expectNoMsg(max: FiniteDuration) { expectNoMsg_internal(max.dilated) } - private def expectNoMsg_internal(max: Duration) { + private def expectNoMsg_internal(max: FiniteDuration) { val o = receiveOne(max) assert(o eq null, "received unexpected message " + o) lastWasNoMsg = true @@ -509,7 +519,7 @@ trait TestKitBase { * */ def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[AnyRef, T]): Seq[T] = { - val stop = now + (if (max eq Duration.Undefined) remaining else max.dilated) + val stop = now + remainingOrDilated(max) var msg: Message = NullMessage @tailrec @@ -546,7 +556,7 @@ trait TestKitBase { /** * Receive N messages in a row before the given deadline. */ - def receiveN(n: Int, max: Duration): Seq[AnyRef] = receiveN_internal(n, max.dilated) + def receiveN(n: Int, max: FiniteDuration): Seq[AnyRef] = receiveN_internal(n, max.dilated) private def receiveN_internal(n: Int, max: Duration): Seq[AnyRef] = { val stop = max + now diff --git a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala index f045552d44..cedf351551 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala @@ -8,6 +8,7 @@ import scala.concurrent.util.Duration import akka.actor.ActorSystem import scala.concurrent.{ Await, CanAwait, Awaitable } import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } +import scala.concurrent.util.FiniteDuration /** * A count down latch wrapper for use in testing. @@ -32,7 +33,11 @@ class TestLatch(count: Int = 1)(implicit system: ActorSystem) extends Awaitable[ @throws(classOf[TimeoutException]) def ready(atMost: Duration)(implicit permit: CanAwait) = { - val opened = latch.await(atMost.dilated.toNanos, TimeUnit.NANOSECONDS) + val waitTime = atMost match { + case f: FiniteDuration ⇒ f + case _ ⇒ throw new IllegalArgumentException("TestLatch does not support waiting for " + atMost) + } + val opened = latch.await(waitTime.dilated.toNanos, TimeUnit.NANOSECONDS) if (!opened) throw new TimeoutException( "Timeout of %s with time factor of %s" format (atMost.toString, TestKitExtension(system).TestTimeFactor)) this diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index e9d37d8e70..247cf9e17f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -3,7 +3,7 @@ package akka import language.implicitConversions import akka.actor.ActorSystem -import scala.concurrent.util.Duration +import scala.concurrent.util.{ Duration, FiniteDuration } import java.util.concurrent.TimeUnit.MILLISECONDS import scala.reflect.ClassTag @@ -41,14 +41,15 @@ package object testkit { * * Corresponding Java API is available in TestKit.dilated */ - implicit def duration2TestDuration(duration: Duration) = new TestDuration(duration) + implicit def duration2TestDuration(duration: FiniteDuration) = new TestDuration(duration) /** * Wrapper for implicit conversion to add dilated function to Duration. */ - class TestDuration(duration: Duration) { - def dilated(implicit system: ActorSystem): Duration = { - duration * TestKitExtension(system).TestTimeFactor + class TestDuration(duration: FiniteDuration) { + def dilated(implicit system: ActorSystem): FiniteDuration = { + // this cast will succeed unless TestTimeFactor is non-finite (which would be a misconfiguration) + (duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration] } } }