diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 17c63bcd57..0a3cd48aa5 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -160,6 +160,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { if (!_completed) { _completed = true _result = Some(result) + onComplete(result) } } finally { _signal.signalAll @@ -171,6 +172,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { if (!_completed) { _completed = true _exception = Some(exception) + onCompleteException(exception) } } finally { _signal.signalAll @@ -178,4 +180,6 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { } private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) + protected def onComplete(result: T) {} + protected def onCompleteException(exception: Throwable) {} } diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 4a95436cd0..c1f25b6d4f 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -30,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler import scala.collection.mutable.Map import scala.reflect.BeanProperty +import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} /** * Use this object if you need a single remote server on a specific node. @@ -498,27 +499,36 @@ class RemoteServerHandler( case RemoteActorSystemMessage.Stop => actorRef.stop case _ => // then match on user defined messages if (request.getIsOneWay) actorRef.!(message)(sender) - else { - try { - val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] - val result = if (resultOrNone.isDefined) resultOrNone.get else null + else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some( + new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ + override def onComplete(result: AnyRef) { + log.debug("Returning result from actor invocation [%s]", result) + val replyBuilder = RemoteReplyProtocol.newBuilder + .setId(request.getId) + .setMessage(MessageSerializer.serialize(result)) + .setIsSuccessful(true) + .setIsActor(true) - log.debug("Returning result from actor invocation [%s]", result) - val replyBuilder = RemoteReplyProtocol.newBuilder - .setId(request.getId) - .setMessage(MessageSerializer.serialize(result)) - .setIsSuccessful(true) - .setIsActor(true) + if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + try { + channel.write(replyBuilder.build) + } catch { + case e: Throwable => + server.notifyListeners(RemoteServerError(e, server)) + } + } - } catch { - case e: Throwable => - channel.write(createErrorReplyMessage(e, request, true)) - server.notifyListeners(RemoteServerError(e, server)) - } + override def onCompleteException(exception: Throwable) { + try { + channel.write(createErrorReplyMessage(exception, request, true)) + } catch { + case e: Throwable => + server.notifyListeners(RemoteServerError(e, server)) + } + } } + )) } }