Refactored Future API to make it more Java friendly
This commit is contained in:
parent
c4f4ddfe6e
commit
b0b294aa74
6 changed files with 17 additions and 24 deletions
|
|
@ -306,7 +306,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
if (isTypedActor) throw e
|
||||
else None
|
||||
}
|
||||
if (future.exception.isDefined) throw future.exception.get._2
|
||||
if (future.exception.isDefined) throw future.exception.get
|
||||
else future.result
|
||||
} else throw new ActorInitializationException(
|
||||
"Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
|
|
@ -1179,7 +1179,7 @@ class LocalActorRef private[akka](
|
|||
}
|
||||
}
|
||||
|
||||
senderFuture.foreach(_.completeWithException(this, reason))
|
||||
senderFuture.foreach(_.completeWithException(reason))
|
||||
|
||||
clearTransaction
|
||||
if (topLevelTransaction) clearTransactionSet
|
||||
|
|
|
|||
|
|
@ -127,17 +127,13 @@ object ActorRegistry extends ListenerManagement {
|
|||
if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor)
|
||||
|
||||
val set = actorsById get id
|
||||
if(set ne null)
|
||||
set add actor
|
||||
if (set ne null) set add actor
|
||||
else {
|
||||
val newSet = new ConcurrentSkipListSet[ActorRef]
|
||||
newSet add actor
|
||||
|
||||
val oldSet = actorsById.putIfAbsent(id,newSet)
|
||||
|
||||
//Parry for two simultaneous putIfAbsent(id,newSet)
|
||||
if(oldSet ne null)
|
||||
oldSet add actor
|
||||
// Parry for two simultaneous putIfAbsent(id,newSet)
|
||||
if (oldSet ne null) oldSet add actor
|
||||
}
|
||||
|
||||
// UUID
|
||||
|
|
@ -154,8 +150,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
actorsByUUID remove actor.uuid
|
||||
|
||||
val set = actorsById get actor.id
|
||||
if (set ne null)
|
||||
set remove actor
|
||||
if (set ne null) set remove actor
|
||||
|
||||
//FIXME: safely remove set if empty, leaks memory
|
||||
|
||||
|
|
|
|||
|
|
@ -629,10 +629,8 @@ private[akka] sealed class TypedActorAspect {
|
|||
}
|
||||
|
||||
private def getResultOrThrowException[T](future: Future[T]): Option[T] =
|
||||
if (future.exception.isDefined) {
|
||||
val (_, cause) = future.exception.get
|
||||
throw cause
|
||||
} else future.result
|
||||
if (future.exception.isDefined) throw future.exception.get
|
||||
else future.result
|
||||
|
||||
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ object Futures {
|
|||
try {
|
||||
promise completeWithResult body
|
||||
} catch {
|
||||
case e => promise completeWithException (None, e)
|
||||
case e => promise completeWithException e
|
||||
}
|
||||
promise
|
||||
}
|
||||
|
|
@ -77,12 +77,12 @@ sealed trait Future[T] {
|
|||
def isExpired: Boolean
|
||||
def timeoutInNanos: Long
|
||||
def result: Option[T]
|
||||
def exception: Option[Tuple2[AnyRef, Throwable]]
|
||||
def exception: Option[Throwable]
|
||||
}
|
||||
|
||||
trait CompletableFuture[T] extends Future[T] {
|
||||
def completeWithResult(result: T)
|
||||
def completeWithException(toBlame: AnyRef, exception: Throwable)
|
||||
def completeWithException(exception: Throwable)
|
||||
}
|
||||
|
||||
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
|
||||
|
|
@ -96,7 +96,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
private val _signal = _lock.newCondition
|
||||
private var _completed: Boolean = _
|
||||
private var _result: Option[T] = None
|
||||
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
|
||||
private var _exception: Option[Throwable] = None
|
||||
|
||||
def await = try {
|
||||
_lock.lock
|
||||
|
|
@ -147,7 +147,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
_lock.unlock
|
||||
}
|
||||
|
||||
def exception: Option[Tuple2[AnyRef, Throwable]] = try {
|
||||
def exception: Option[Throwable] = try {
|
||||
_lock.lock
|
||||
_exception
|
||||
} finally {
|
||||
|
|
@ -165,11 +165,11 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
_lock.unlock
|
||||
}
|
||||
|
||||
def completeWithException(toBlame: AnyRef, exception: Throwable) = try {
|
||||
def completeWithException(exception: Throwable) = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
_exception = Some((toBlame, exception))
|
||||
_exception = Some(exception)
|
||||
}
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
|
|
|
|||
|
|
@ -326,7 +326,7 @@ class RemoteClientHandler(
|
|||
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
|
||||
}
|
||||
future.completeWithException(null, parseException(reply, client.loader))
|
||||
future.completeWithException(parseException(reply, client.loader))
|
||||
}
|
||||
futures.remove(reply.getId)
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ class FutureSpec extends JUnitSuite {
|
|||
val future = actor !!! "Failure"
|
||||
future.await
|
||||
assert(future.exception.isDefined)
|
||||
assert("Expected exception; to test fault-tolerance" === future.exception.get._2.getMessage)
|
||||
assert("Expected exception; to test fault-tolerance" === future.exception.get.getMessage)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue