Fixing #437 by adding "Remote" Future

This commit is contained in:
Viktor Klang 2010-09-16 15:58:46 +02:00
parent 1960241b87
commit 971ebf4501
2 changed files with 31 additions and 17 deletions

View file

@ -160,6 +160,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
if (!_completed) { if (!_completed) {
_completed = true _completed = true
_result = Some(result) _result = Some(result)
onComplete(result)
} }
} finally { } finally {
_signal.signalAll _signal.signalAll
@ -171,6 +172,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
if (!_completed) { if (!_completed) {
_completed = true _completed = true
_exception = Some(exception) _exception = Some(exception)
onCompleteException(exception)
} }
} finally { } finally {
_signal.signalAll _signal.signalAll
@ -178,4 +180,6 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
} }
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
protected def onComplete(result: T) {}
protected def onCompleteException(exception: Throwable) {}
} }

View file

@ -30,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map import scala.collection.mutable.Map
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
/** /**
* Use this object if you need a single remote server on a specific node. * Use this object if you need a single remote server on a specific node.
@ -498,11 +499,9 @@ class RemoteServerHandler(
case RemoteActorSystemMessage.Stop => actorRef.stop case RemoteActorSystemMessage.Stop => actorRef.stop
case _ => // then match on user defined messages case _ => // then match on user defined messages
if (request.getIsOneWay) actorRef.!(message)(sender) if (request.getIsOneWay) actorRef.!(message)(sender)
else { else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some(
try { new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] override def onComplete(result: AnyRef) {
val result = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result) log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId) .setId(request.getId)
@ -511,14 +510,25 @@ class RemoteServerHandler(
.setIsActor(true) .setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
try {
channel.write(replyBuilder.build)
} catch { } catch {
case e: Throwable => case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, true))
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
} }
} }
override def onCompleteException(exception: Throwable) {
try {
channel.write(createErrorReplyMessage(exception, request, true))
} catch {
case e: Throwable =>
server.notifyListeners(RemoteServerError(e, server))
}
}
}
))
} }
} }