re #281: Removed type parameter from ActorRef.!! which now returns Option[Any] and added Helpers.narrow and Helpers.narrowSilently.
This commit is contained in:
parent
ee193659c1
commit
1e6a4c012d
12 changed files with 61 additions and 30 deletions
|
|
@ -11,6 +11,7 @@ import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, Completabl
|
|||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
|
||||
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
|
||||
import org.codehaus.aspectwerkz.proxy.Proxy
|
||||
|
|
@ -548,7 +549,7 @@ private[akka] sealed class ActiveObjectAspect {
|
|||
actorRef ! Invocation(joinPoint, true, true, sender, senderFuture)
|
||||
null.asInstanceOf[AnyRef]
|
||||
} else {
|
||||
val result = actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout)
|
||||
val result = narrow[AnyRef](actorRef !! (Invocation(joinPoint, false, isOneWay, sender, senderFuture), timeout))
|
||||
if (result.isDefined) result.get
|
||||
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -285,9 +285,9 @@ trait ActorRef extends TransactionManagement {
|
|||
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def !(implicit sender: Option[ActorRef] = None): Option[T] = {
|
||||
def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
|
||||
if (isRunning) {
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
|
||||
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
|
||||
val isActiveObject = message.isInstanceOf[Invocation]
|
||||
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) {
|
||||
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import java.util.{Map => JMap}
|
|||
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
|
||||
|
|
@ -369,8 +370,8 @@ class RemoteServerHandler(
|
|||
if (request.getIsOneWay) actorRef.!(message)(sender)
|
||||
else {
|
||||
try {
|
||||
val resultOrNone = actorRef.!!(message)(sender)
|
||||
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
|
||||
val resultOrNone = narrow[AnyRef](actorRef.!!(message)(sender))
|
||||
val result = if (resultOrNone.isDefined) resultOrNone.get else null
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
|||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.dispatch.CompletableFuture
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
|
||||
/**
|
||||
* Implements Oz-style dataflow (single assignment) variables.
|
||||
|
|
@ -102,10 +103,10 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
|
|||
else {
|
||||
val out = actorOf(new Out(this)).start
|
||||
blockedReaders.offer(out)
|
||||
val result = out !! Get
|
||||
val result = narrow[T](out !! Get)
|
||||
out ! Exit
|
||||
result.getOrElse(throw new DataFlowVariableException(
|
||||
"Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
|
||||
if (result.isDefined) result.get
|
||||
else throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,5 +37,26 @@ object Helpers extends Logging {
|
|||
})
|
||||
sb.toString
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException
|
||||
* if the actual type is not assignable from the given one.
|
||||
*/
|
||||
def narrow[T](o: Option[Any]): Option[T] = {
|
||||
require(o != null, "Option to be narrowed must not be null!")
|
||||
o.asInstanceOf[Option[T]]
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible
|
||||
* ClassCastException and return None in that case.
|
||||
*/
|
||||
def narrowSilently[T: Manifest](o: Option[Any]): Option[T] =
|
||||
try {
|
||||
narrow(o)
|
||||
} catch {
|
||||
case e: ClassCastException =>
|
||||
log.warning(e, "Cannot narrow %s to expected type %s!", o, implicitly[Manifest[T]].erasure.getName)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
|
|||
import se.scalablesolutions.akka.actor.Actor
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
|
||||
import org.scalatest.Suite
|
||||
import org.junit.runner.RunWith
|
||||
|
|
@ -39,9 +40,9 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
|
|||
}.start
|
||||
|
||||
val result = for {
|
||||
a <- (d.!)
|
||||
b <- (d.!)
|
||||
c <- (d.!)
|
||||
a <- narrow[Int](d !! (testMsg1,5000))
|
||||
b <- narrow[Int](d !! (testMsg2,5000))
|
||||
c <- narrow[Int](d !! (testMsg3,5000))
|
||||
} yield a + b + c
|
||||
|
||||
result.get must be(21)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
import Actor._
|
||||
|
||||
object ExecutorBasedEventDrivenDispatcherActorSpec {
|
||||
|
|
@ -41,8 +42,8 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
|
|||
|
||||
@Test def shouldSendReplySync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result: String = (actor !! ("Hello", 10000)).get
|
||||
assert("World" === result)
|
||||
val result = narrow[String](actor !! ("Hello", 10000))
|
||||
assert("World" === result.get)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.stm.{TransactionalState, TransactionalMap, TransactionalRef, TransactionalVector}
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
import Actor._
|
||||
|
||||
object InMemoryActorSpec {
|
||||
|
|
@ -116,7 +117,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
stateful.start
|
||||
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
|
||||
val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
|
||||
assert(notifier.get.await(1, TimeUnit.SECONDS))
|
||||
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||
}
|
||||
|
|
@ -138,7 +139,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
failer.start
|
||||
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
|
||||
val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
|
||||
assert(notifier.get.await(1, TimeUnit.SECONDS))
|
||||
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||
}
|
||||
|
|
@ -163,7 +164,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
stateful.start
|
||||
stateful ! SetVectorStateOneWay("init") // set init state
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
|
||||
val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
|
||||
assert(notifier.get.await(1, TimeUnit.SECONDS))
|
||||
assert(2 === (stateful !! GetVectorSize).get)
|
||||
}
|
||||
|
|
@ -186,7 +187,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
val failer = actorOf[InMemFailerActor]
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
|
||||
val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
|
||||
assert(notifier.get.await(1, TimeUnit.SECONDS))
|
||||
assert(1 === (stateful !! GetVectorSize).get)
|
||||
}
|
||||
|
|
@ -211,7 +212,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
stateful.start
|
||||
stateful ! SetRefStateOneWay("init") // set init state
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
|
||||
val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
|
||||
assert(notifier.get.await(1, TimeUnit.SECONDS))
|
||||
assert("new state" === (stateful !! GetRefState).get)
|
||||
}
|
||||
|
|
@ -234,7 +235,7 @@ class InMemoryActorSpec extends JUnitSuite {
|
|||
val failer = actorOf[InMemFailerActor]
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
val notifier: Option[CountDownLatch] = stateful !! GetNotifier
|
||||
val notifier = narrow[CountDownLatch](stateful !! GetNotifier)
|
||||
assert(notifier.get.await(1, TimeUnit.SECONDS))
|
||||
assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import org.junit.Test
|
|||
import Actor._
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
|
||||
object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec {
|
||||
class TestActor extends Actor {
|
||||
|
|
@ -44,7 +45,7 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite
|
|||
|
||||
@Test def shouldSendReplySync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result: String = (actor !! ("Hello", 10000)).get
|
||||
val result = narrow[String](actor !! ("Hello", 10000)).get
|
||||
assert("World" === result)
|
||||
actor.stop
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
import Actor._
|
||||
|
||||
object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec {
|
||||
|
|
@ -39,7 +40,7 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
|
|||
|
||||
@Test def shouldSendReplySync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result: String = (actor !! ("Hello", 10000)).get
|
||||
val result = narrow[String](actor !! ("Hello", 10000)).get
|
||||
assert("World" === result)
|
||||
actor.stop
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.stm._
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
|
||||
import Actor._
|
||||
|
||||
|
|
@ -89,10 +90,10 @@ class StmSpec extends
|
|||
try {
|
||||
val actor = actorOf[GlobalTransactionVectorTestActor].start
|
||||
actor !! Add(5)
|
||||
val size1: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size"))
|
||||
val size1: Int = narrow(actor !! Size).getOrElse(fail("Could not get Vector::size"))
|
||||
size1 should equal(2)
|
||||
actor !! Add(2)
|
||||
val size2: Int = (actor !! Size).getOrElse(fail("Could not get Vector::size"))
|
||||
val size2 = narrow[Int](actor !! Size).getOrElse(fail("Could not get Vector::size"))
|
||||
size2 should equal(3)
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
@ -108,18 +109,18 @@ class StmSpec extends
|
|||
try {
|
||||
val actor = actorOf[NestedTransactorLevelOneActor].start
|
||||
actor !! Add(2)
|
||||
val size1: Int = (actor !! Size).getOrElse(fail("Could not get size"))
|
||||
val size1: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
|
||||
size1 should equal(2)
|
||||
actor !! Add(7)
|
||||
actor ! "HiLevelOne"
|
||||
val size2: Int = (actor !! Size).getOrElse(fail("Could not get size"))
|
||||
val size2: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
|
||||
size2 should equal(7)
|
||||
actor !! Add(0)
|
||||
actor ! "HiLevelTwo"
|
||||
val size3: Int = (actor !! Size).getOrElse(fail("Could not get size"))
|
||||
val size3: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
|
||||
size3 should equal(0)
|
||||
actor !! Add(3)
|
||||
val size4: Int = (actor !! Size).getOrElse(fail("Could not get size"))
|
||||
val size4: Int = narrow(actor !! Size).getOrElse(fail("Could not get size"))
|
||||
size4 should equal(3)
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.util.Helpers.narrow
|
||||
import Actor._
|
||||
|
||||
object ThreadBasedActorSpec {
|
||||
|
|
@ -40,8 +41,8 @@ class ThreadBasedActorSpec extends JUnitSuite {
|
|||
|
||||
@Test def shouldSendReplySync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result: String = (actor !! ("Hello", 10000)).get
|
||||
assert("World" === result)
|
||||
val result = narrow[String](actor !! ("Hello", 10000))
|
||||
assert("World" === result.get)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue