diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala index b42ac75bcb..a10100497a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala @@ -35,21 +35,10 @@ class ActorTimeoutSpec } } - "use implicitly supplied timeout" in { - implicit val timeout = Actor.Timeout(testTimeout) - within(testTimeout - 100.millis, testTimeout + 300.millis) { - val f = (echo ? "hallo").mapTo[String] - intercept[FutureTimeoutException] { f.await } - f.value must be(None) - } - } - "use explicitly supplied timeout" in { within(testTimeout - 100.millis, testTimeout + 300.millis) { - (echo.?("hallo")(timeout = testTimeout)).as[String] must be(None) + (echo.?("hallo", testTimeout)).as[String] must be(None) } } - } - } diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index 3fb0908402..8c5f397387 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -66,7 +66,7 @@ object SupervisorSpec { } override def receive = { - case Die ⇒ (temp.?(Die)(timeout = TimeoutMillis)).get + case Die ⇒ (temp.?(Die, TimeoutMillis)).get } } @@ -200,7 +200,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach } def ping(pingPongActor: ActorRef) = { - (pingPongActor.?(Ping)(timeout = TimeoutMillis)).as[String].getOrElse("nil") must be === PongMessage + (pingPongActor.?(Ping, TimeoutMillis)).as[String].getOrElse("nil") must be === PongMessage messageLogPoll must be === PingMessage } @@ -215,7 +215,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach val master = actorOf[Master].start() intercept[RuntimeException] { - (master.?(Die)(timeout = TimeoutMillis)).get + (master.?(Die, TimeoutMillis)).get } sleepFor(1 second) @@ -226,7 +226,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach val (temporaryActor, supervisor) = temporaryActorAllForOne intercept[RuntimeException] { - (temporaryActor.?(Die)(timeout = TimeoutMillis)).get + (temporaryActor.?(Die, TimeoutMillis)).get } sleepFor(1 second) @@ -374,13 +374,13 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach Supervise(dyingActor, Permanent) :: Nil)) intercept[Exception] { - (dyingActor.?(Die)(timeout = TimeoutMillis)).get + (dyingActor.?(Die, TimeoutMillis)).get } // give time for restart sleepFor(3 seconds) - (dyingActor.?(Ping)(timeout = TimeoutMillis)).as[String].getOrElse("nil") must be === PongMessage + (dyingActor.?(Ping, TimeoutMillis)).as[String].getOrElse("nil") must be === PongMessage inits.get must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala index 83d6bc21eb..75f984065c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala @@ -45,7 +45,7 @@ class DispatcherActorSpec extends JUnitSuite { @Test def shouldSendReplySync = { val actor = actorOf[TestActor].start() - val result = (actor.?("Hello")(timeout = 10000)).as[String] + val result = (actor.?("Hello", 10000)).as[String] assert("World" === result.get) actor.stop() } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index df829177b9..de81074303 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -180,7 +180,7 @@ class FutureSpec extends JUnitSuite { }).start() } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout = timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } assert(Futures.fold(0, timeout)(futures)(_ + _).await.result.get === 45) } @@ -191,7 +191,7 @@ class FutureSpec extends JUnitSuite { def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add } }).start() } - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout = 10000).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] } assert(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)).get === 45) } @@ -208,7 +208,7 @@ class FutureSpec extends JUnitSuite { }).start() } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout = timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } assert(Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") } @@ -225,7 +225,7 @@ class FutureSpec extends JUnitSuite { }).start() } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout = timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } assert(Futures.reduce(futures, timeout)(_ + _).get === 45) } @@ -242,7 +242,7 @@ class FutureSpec extends JUnitSuite { }).start() } val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout = timeout).mapTo[Int] } + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala index 4744e8f451..a3d647fcf3 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala @@ -43,7 +43,7 @@ class PinnedActorSpec extends JUnitSuite { @Test def shouldSendReplySync = { val actor = actorOf[TestActor].start() - val result = (actor.?("Hello")(timeout = 10000)).as[String] + val result = (actor.?("Hello", 10000)).as[String] assert("World" === result.get) actor.stop() } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 126eb3ba45..9c1704b7f5 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -238,7 +238,7 @@ class RoutingSpec extends WordSpec with MustMatchers { }).start() try { - (for (count ← 1 to 500) yield pool.?("Test")(timeout = 20000)) foreach { + (for (count ← 1 to 500) yield pool.?("Test", 20000)) foreach { _.await.resultOrException.get must be("Response") } } finally { diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index ba2a84196d..b838f33efc 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -28,7 +28,7 @@ class Ticket703Spec extends WordSpec with MustMatchers { } }) }).start() - (actorPool.?("Ping")(timeout = 7000)).await.result must be === Some("Response") + (actorPool.?("Ping", 7000)).await.result must be === Some("Response") } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 429b686d5b..e4031962c9 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -89,8 +89,11 @@ class InvalidMessageException private[akka] (message: String, cause: Throwable = /** * This message is thrown by default when an Actors behavior doesn't match a message */ -case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception { - override def getMessage = "Actor %s does not handle [%s]".format(ref, msg) +case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exception { + // constructor with 'null' ActorRef needed to work with client instantiation of remote exception + override def getMessage = + if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg) + else "Actor does not handle [%s]".format(msg) override def fillInStackTrace() = this //Don't waste cycles generating stack trace } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index dea6f39018..7694663c95 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -248,7 +248,8 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com * If you are sending messages using ask then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def ask(message: AnyRef, timeout: Long, sender: ActorRef): Future[AnyRef] = ?(message)(sender, Actor.Timeout(timeout)).asInstanceOf[Future[AnyRef]] + def ask(message: AnyRef, timeout: Long, sender: ActorRef): Future[AnyRef] = + ?(message, Actor.Timeout(timeout))(sender).asInstanceOf[Future[AnyRef]] /** * Akka Java API.

@@ -758,7 +759,8 @@ class LocalActorRef private[akka] ( } def tooManyRestarts() { - notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)) + notifySupervisorWithMessage( + MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)) stop() } @@ -1170,7 +1172,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR /** * Sends a message asynchronously, returning a future which may eventually hold the reply. */ - def ?(message: Any)(implicit channel: UntypedChannel = NullChannel, timeout: Actor.Timeout = Actor.defaultTimeout): Future[Any] = { + def ?(message: Any, timeout: Actor.Timeout = Actor.defaultTimeout)(implicit channel: UntypedChannel = NullChannel): Future[Any] = { if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout.duration.toMillis, channel) else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") @@ -1184,7 +1186,8 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR def forward(message: Any)(implicit channel: ForwardableChannel) = { if (isRunning) { postMessageToMailbox(message, channel.channel) - } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start()' before using it") + } else throw new ActorInitializationException( + "Actor has not been started, you need to invoke 'actor.start()' before using it") } /** diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 0ba0e44d5c..1e7c095f39 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -41,15 +41,15 @@ object TypedActor { case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ - implicit val timeout = Actor.Timeout(actor.timeout) + val timeout = Actor.Timeout(actor.timeout) MethodCall(method, args) match { case m if m.isOneWay ⇒ actor ! m null case m if m.returnsFuture_? ⇒ - actor ? m + actor ? (m, timeout) case m if m.returnsJOption_? || m.returnsOption_? ⇒ - val f = actor ? m + val f = actor ? (m, timeout) try { f.await } catch { case _: FutureTimeoutException ⇒ } f.value match { case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None @@ -57,7 +57,7 @@ object TypedActor { case Some(Left(ex)) ⇒ throw ex } case m ⇒ - (actor ? m).get.asInstanceOf[AnyRef] + (actor ? (m, timeout)).get.asInstanceOf[AnyRef] } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 02cd02f1df..aa96e58845 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -311,18 +311,19 @@ sealed trait Future[+T] { /** * Await completion of this Future (as `await`) and return its value if it * conforms to A's erased type. + * + * def as[A](implicit m: Manifest[A]): Option[A] = + * try { + * await + * value match { + * case None ⇒ None + * case Some(_: Left[_, _]) ⇒ None + * case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) + * } + * } catch { + * case _: Exception ⇒ None + * } */ - def as[A](implicit m: Manifest[A]): Option[A] = - try { - await - value match { - case None ⇒ None - case Some(_: Left[_, _]) ⇒ None - case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) - } - } catch { - case _: Exception ⇒ None - } /** * Tests whether this Future has been completed. diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index cc9fbfaf2f..73eb117f0b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -676,7 +676,7 @@ class DefaultClusterNode private[akka] ( case None ⇒ val error = new ClusterException( - "Operation to instantiate replicas throughout the cluster timed out, cause of error unknow") + "Operation to instantiate replicas throughout the cluster timed out") EventHandler.error(error, this, error.toString) throw error } diff --git a/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala index a4bda26c77..84f91a0453 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala @@ -11,7 +11,7 @@ class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfte val logPath = "_akka_cluster/log" var zkServer: ZkServer = _ var zkClient: AkkaZkClient = _ - + /* override def beforeAll() { try { zkServer = Cluster.startLocalCluster(dataPath, logPath) @@ -30,10 +30,10 @@ class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfte Cluster.shutdownLocalCluster() Actor.registry.local.shutdownAll() } - +*/ "unversioned load" must { "throw MissingDataException if non existing key" in { - val store = new ZooKeeperStorage(zkClient) + // val store = new ZooKeeperStorage(zkClient) //try { // store.load("foo") diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala index 52f85fead1..668acb3376 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala @@ -31,7 +31,7 @@ class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with Be import RoundRobin1ReplicaMultiJvmSpec._ private var bookKeeper: BookKeeper = _ - private var localBookKeeper: LocalBookKeeper = _ + // private var localBookKeeper: LocalBookKeeper = _ "A cluster" must { @@ -55,13 +55,13 @@ class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with Be override def beforeAll() = { Cluster.startLocalCluster() - LocalBookKeeperEnsemble.start() + // LocalBookKeeperEnsemble.start() } override def afterAll() = { Cluster.shutdownLocalCluster() - TransactionLog.shutdown() - LocalBookKeeperEnsemble.shutdown() + // TransactionLog.shutdown() + // LocalBookKeeperEnsemble.shutdown() } } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 7e11fc3670..be30615b85 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -120,7 +120,7 @@ class Agent[T](initialValue: T) { * within the given timeout */ def alter(f: T ⇒ T)(timeout: Long): Future[T] = { - def dispatch = updater.?(Update(f))(timeout = timeout).asInstanceOf[Future[T]] + def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]] if (Stm.activeTransaction) { val result = new DefaultPromise[T](timeout) get //Join xa @@ -168,7 +168,7 @@ class Agent[T](initialValue: T) { send((value: T) ⇒ { suspend val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start() - result completeWith threadBased.?(Update(f))(timeout = timeout).asInstanceOf[Future[T]] + result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] value }) result diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 7fcabf0990..72ee614752 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -104,7 +104,7 @@ object Pi extends App { val start = now //send calculate message - master.?(Calculate)(timeout = Actor.Timeout(60000)). + master.?(Calculate, Actor.Timeout(60000)). await.resultOrException match {//wait for the result, with a 60 seconds timeout case Some(pi) => EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))