diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala
index a2d2820cca..8ef49eebda 100644
--- a/akka-core/src/main/scala/actor/ActiveObject.scala
+++ b/akka-core/src/main/scala/actor/ActiveObject.scala
@@ -11,6 +11,7 @@ import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, Completabl
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._
+import se.scalablesolutions.akka.util.Helpers.narrow
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
@@ -548,7 +549,7 @@ private[akka] sealed class ActiveObjectAspect {
actorRef ! Invocation(joinPoint, true, true, sender, senderFuture)
null.asInstanceOf[AnyRef]
} else {
- val result = actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout)
+ val result = narrow[AnyRef](actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout))
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
}
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index 5f08c7b900..0531a43438 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -285,9 +285,9 @@ trait ActorRef extends TransactionManagement {
* If you are sending messages using !! then you have to use self.reply(..)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
- def !(implicit sender: Option[ActorRef] = None): Option[T] = {
+ def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
if (isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) {
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index aafe38c910..c374d98863 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -11,6 +11,7 @@ import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.util._
+import se.scalablesolutions.akka.util.Helpers.narrow
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._
import se.scalablesolutions.akka.config.Config.config
@@ -369,8 +370,8 @@ class RemoteServerHandler(
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
- val resultOrNone = actorRef.!!(message)(sender)
- val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
+ val resultOrNone = narrow[AnyRef](actorRef.!!(message)(sender))
+ val result = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index 752c71cead..925eb922a5 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -10,6 +10,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.dispatch.CompletableFuture
+import se.scalablesolutions.akka.util.Helpers.narrow
/**
* Implements Oz-style dataflow (single assignment) variables.
@@ -102,10 +103,10 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
else {
val out = actorOf(new Out(this)).start
blockedReaders.offer(out)
- val result = out !! Get
+ val result = narrow[T](out !! Get)
out ! Exit
- result.getOrElse(throw new DataFlowVariableException(
- "Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
+ if (result.isDefined) result.get
+ else throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")
}
}
diff --git a/akka-core/src/main/scala/util/Helpers.scala b/akka-core/src/main/scala/util/Helpers.scala
index 4835a4dd05..ccbd896610 100644
--- a/akka-core/src/main/scala/util/Helpers.scala
+++ b/akka-core/src/main/scala/util/Helpers.scala
@@ -37,5 +37,26 @@ object Helpers extends Logging {
})
sb.toString
}
-}
+ /**
+ * Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException
+ * if the actual type is not assignable from the given one.
+ */
+ def narrow[T](o: Option[Any]): Option[T] = {
+ require(o != null, "Option to be narrowed must not be null!")
+ o.asInstanceOf[Option[T]]
+ }
+
+ /**
+ * Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible
+ * ClassCastException and return None in that case.
+ */
+ def narrowSilently[T: Manifest](o: Option[Any]): Option[T] =
+ try {
+ narrow(o)
+ } catch {
+ case e: ClassCastException =>
+ log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName)
+ None
+ }
+}
diff --git a/akka-core/src/test/scala/ActorPatternsTest.scala b/akka-core/src/test/scala/ActorPatternsTest.scala
index 0d4e9b6b08..a130cc2d7c 100644
--- a/akka-core/src/test/scala/ActorPatternsTest.scala
+++ b/akka-core/src/test/scala/ActorPatternsTest.scala
@@ -5,6 +5,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util.Logging
+import se.scalablesolutions.akka.util.Helpers.narrow
import org.scalatest.Suite
import org.junit.runner.RunWith
@@ -39,9 +40,9 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
}.start
val result = for {
- a <- (d.!)
- b <- (d.!)
- c <- (d.!)
+ a <- narrow[Int](d !! (testMsg1,5000))
+ b <- narrow[Int](d !! (testMsg2,5000))
+ c <- narrow[Int](d !! (testMsg3,5000))
} yield a + b + c
result.get must be(21)
diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala
index f876d59527..8084900de2 100644
--- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala
+++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorSpec.scala
@@ -4,6 +4,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
+import se.scalablesolutions.akka.util.Helpers.narrow
import Actor._
object ExecutorBasedEventDrivenDispatcherActorSpec {
@@ -41,8 +42,8 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
- val result: String = (actor !! ("Hello", 10000)).get
- assert("World" === result)
+ val result = narrow[String](actor !! ("Hello", 10000))
+ assert("World" === result.get)
actor.stop
}
diff --git a/akka-core/src/test/scala/InMemoryActorSpec.scala b/akka-core/src/test/scala/InMemoryActorSpec.scala
index 814e3fb841..b35973cac9 100644
--- a/akka-core/src/test/scala/InMemoryActorSpec.scala
+++ b/akka-core/src/test/scala/InMemoryActorSpec.scala
@@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.stm.{TransactionalState, TransactionalMap, TransactionalRef, TransactionalVector}
+import se.scalablesolutions.akka.util.Helpers.narrow
import Actor._
object InMemoryActorSpec {
@@ -116,7 +117,7 @@ class InMemoryActorSpec extends JUnitSuite {
stateful.start
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- val notifier: Option[CountDownLatch] = stateful !! GetNotifier
+ val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@@ -138,7 +139,7 @@ class InMemoryActorSpec extends JUnitSuite {
failer.start
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- val notifier: Option[CountDownLatch] = stateful !! GetNotifier
+ val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
@@ -163,7 +164,7 @@ class InMemoryActorSpec extends JUnitSuite {
stateful.start
stateful ! SetVectorStateOneWay("init") // set init state
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- val notifier: Option[CountDownLatch] = stateful !! GetNotifier
+ val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert(2 === (stateful !! GetVectorSize).get)
}
@@ -186,7 +187,7 @@ class InMemoryActorSpec extends JUnitSuite {
val failer = actorOf[InMemFailerActor]
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- val notifier: Option[CountDownLatch] = stateful !! GetNotifier
+ val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert(1 === (stateful !! GetVectorSize).get)
}
@@ -211,7 +212,7 @@ class InMemoryActorSpec extends JUnitSuite {
stateful.start
stateful ! SetRefStateOneWay("init") // set init state
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- val notifier: Option[CountDownLatch] = stateful !! GetNotifier
+ val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("new state" === (stateful !! GetRefState).get)
}
@@ -234,7 +235,7 @@ class InMemoryActorSpec extends JUnitSuite {
val failer = actorOf[InMemFailerActor]
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- val notifier: Option[CountDownLatch] = stateful !! GetNotifier
+ val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
assert(notifier.get.await(1, TimeUnit.SECONDS))
assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
}
diff --git a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala
index 726f79fa22..78afbe3438 100644
--- a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala
+++ b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala
@@ -6,6 +6,7 @@ import org.junit.Test
import Actor._
import se.scalablesolutions.akka.dispatch.Dispatchers
+import se.scalablesolutions.akka.util.Helpers.narrow
object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
@@ -44,7 +45,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
- val result: String = (actor !! ("Hello", 10000)).get
+ val result = narrow[String](actor !! ("Hello", 10000)).get
assert("World" === result)
actor.stop
}
diff --git a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala
index b1f3dae678..94a93a9a95 100644
--- a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala
+++ b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala
@@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
+import se.scalablesolutions.akka.util.Helpers.narrow
import Actor._
object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec {
@@ -39,7 +40,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
- val result: String = (actor !! ("Hello", 10000)).get
+ val result = narrow[String](actor !! ("Hello", 10000)).get
assert("World" === result)
actor.stop
}
diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala
index 17d4be32bd..5a4459513c 100644
--- a/akka-core/src/test/scala/StmSpec.scala
+++ b/akka-core/src/test/scala/StmSpec.scala
@@ -1,6 +1,7 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.stm._
+import se.scalablesolutions.akka.util.Helpers.narrow
import Actor._
@@ -89,10 +90,10 @@ class StmSpec extends
try {
val actor = actorOf[GlobalTransactionVectorTestActor].start
actor !! Add(5)
- val size1: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size"))
+ val size1: Int = narrow(actor !! Size).getOrElse(fail("Could not get Vector::size"))
size1 should equal(2)
actor !! Add(2)
- val size2: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size"))
+ val size2 = narrow[Int](actor !! Size).getOrElse(fail("Could not get Vector::size"))
size2 should equal(3)
} catch {
case e =>
@@ -108,18 +109,18 @@ class StmSpec extends
try {
val actor = actorOf[NestedTransactorLevelOneActor].start
actor !! Add(2)
- val size1: Int = (actor !! Size).getOrElse(fail("Could not get size"))
+ val size1: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
size1 should equal(2)
actor !! Add(7)
actor ! "HiLevelOne"
- val size2: Int = (actor !! Size).getOrElse(fail("Could not get size"))
+ val size2: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
size2 should equal(7)
actor !! Add(0)
actor ! "HiLevelTwo"
- val size3: Int = (actor !! Size).getOrElse(fail("Could not get size"))
+ val size3: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
size3 should equal(0)
actor !! Add(3)
- val size4: Int = (actor !! Size).getOrElse(fail("Could not get size"))
+ val size4: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
size4 should equal(3)
} catch {
case e =>
diff --git a/akka-core/src/test/scala/ThreadBasedActorSpec.scala b/akka-core/src/test/scala/ThreadBasedActorSpec.scala
index d10a39965b..edbd1ca26d 100644
--- a/akka-core/src/test/scala/ThreadBasedActorSpec.scala
+++ b/akka-core/src/test/scala/ThreadBasedActorSpec.scala
@@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
+import se.scalablesolutions.akka.util.Helpers.narrow
import Actor._
object ThreadBasedActorSpec {
@@ -40,8 +41,8 @@ class ThreadBasedActorSpec extends JUnitSuite {
@Test def shouldSendReplySync = {
val actor = actorOf[TestActor].start
- val result: String = (actor !! ("Hello", 10000)).get
- assert("World" === result)
+ val result = narrow[String](actor !! ("Hello", 10000))
+ assert("World" === result.get)
actor.stop
}