Switching to PoisonPill to shut down Per-Session actors, and restructuring some Future-code to avoid wasteful object creation
This commit is contained in:
parent
548502958e
commit
c9338e64f7
2 changed files with 27 additions and 30 deletions
|
|
@ -13,7 +13,12 @@ import akka.serialization.RemoteActorSerialization._
|
|||
import akka.japi.Creator
|
||||
import akka.config.Config._
|
||||
import akka.remoteinterface._
|
||||
import akka.actor.{EventHandler, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
||||
import akka.actor.{PoisonPill, EventHandler, Index,
|
||||
ActorInitializationException, LocalActorRef, newUuid,
|
||||
ActorRegistry, Actor, RemoteActorRef,
|
||||
TypedActor, ActorRef, IllegalActorStateException,
|
||||
RemoteActorSystemMessage, uuidFrom, Uuid,
|
||||
Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
||||
import akka.AkkaException
|
||||
import akka.actor.Actor._
|
||||
import akka.util._
|
||||
|
|
@ -193,7 +198,7 @@ abstract class RemoteClient private[akka] (
|
|||
actorRef.id,
|
||||
actorRef.actorClassName,
|
||||
actorRef.timeout,
|
||||
Left(message),
|
||||
Right(message),
|
||||
isOneWay,
|
||||
senderOption,
|
||||
typedActorInfo,
|
||||
|
|
@ -810,8 +815,10 @@ class RemoteServerHandler(
|
|||
// stop all session actors
|
||||
for (map <- Option(sessionActors.remove(event.getChannel));
|
||||
actor <- asScalaIterable(map.values)) {
|
||||
try { actor.stop } catch { case e: Exception => }
|
||||
try { actor ! PoisonPill } catch { case e: Exception => }
|
||||
}
|
||||
|
||||
//FIXME switch approach or use other thread to execute this
|
||||
// stop all typed session actors
|
||||
for (map <- Option(typedSessionActors.remove(event.getChannel));
|
||||
actor <- asScalaIterable(map.values)) {
|
||||
|
|
@ -886,22 +893,17 @@ class RemoteServerHandler(
|
|||
message,
|
||||
request.getActorInfo.getTimeout,
|
||||
None,
|
||||
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout).
|
||||
onComplete(f => {
|
||||
val result = f.result
|
||||
val exception = f.exception
|
||||
|
||||
if (exception.isDefined) {
|
||||
write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
|
||||
}
|
||||
else if (result.isDefined) {
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout).
|
||||
onComplete(_.value.get match {
|
||||
case l: Left[Throwable, Any] => write(channel, createErrorReplyMessage(l.a, request, AkkaActorType.ScalaActor))
|
||||
case r: Right[Throwable, Any] =>
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef),
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Left(result.get),
|
||||
r,
|
||||
true,
|
||||
Some(actorRef),
|
||||
None,
|
||||
|
|
@ -912,7 +914,6 @@ class RemoteServerHandler(
|
|||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
||||
write(channel, RemoteEncoder.encode(messageBuilder.build))
|
||||
}
|
||||
}
|
||||
)
|
||||
))
|
||||
|
|
@ -957,10 +958,10 @@ class RemoteServerHandler(
|
|||
try {
|
||||
val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses)
|
||||
|
||||
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
|
||||
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) //FIXME execute in non-IO thread
|
||||
else {
|
||||
//Sends the response
|
||||
def sendResponse(result: Either[Any,Throwable]): Unit = try {
|
||||
def sendResponse(result: Either[Throwable,Any]): Unit = try {
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
Right(request.getUuid),
|
||||
|
|
@ -982,14 +983,10 @@ class RemoteServerHandler(
|
|||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
|
||||
messageReceiver.invoke(typedActor, args: _*) match {
|
||||
case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed
|
||||
f.onComplete( future => {
|
||||
val result: Either[Any,Throwable] =
|
||||
if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get)
|
||||
sendResponse(result)
|
||||
})
|
||||
case other => sendResponse(Left(other))
|
||||
messageReceiver.invoke(typedActor, args: _*) match { //FIXME execute in non-IO thread
|
||||
//If it's a future, we can lift on that to defer the send to when the future is completed
|
||||
case f: Future[_] => f.onComplete( future => sendResponse(future.value.get) )
|
||||
case other => sendResponse(Right(other))
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
|
|
@ -1152,7 +1149,7 @@ class RemoteServerHandler(
|
|||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Right(exception),
|
||||
Left(exception),
|
||||
true,
|
||||
None,
|
||||
None,
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ object ActorSerialization {
|
|||
actorRef.id,
|
||||
actorRef.actorClassName,
|
||||
actorRef.timeout,
|
||||
Left(m.message),
|
||||
Right(m.message),
|
||||
false,
|
||||
actorRef.getSender,
|
||||
None,
|
||||
|
|
@ -279,7 +279,7 @@ object RemoteActorSerialization {
|
|||
actorId: String,
|
||||
actorClassName: String,
|
||||
timeout: Long,
|
||||
message: Either[Any, Throwable],
|
||||
message: Either[Throwable, Any],
|
||||
isOneWay: Boolean,
|
||||
senderOption: Option[ActorRef],
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
|
|
@ -319,9 +319,9 @@ object RemoteActorSerialization {
|
|||
.setOneWay(isOneWay)
|
||||
|
||||
message match {
|
||||
case Left(message) =>
|
||||
case Right(message) =>
|
||||
messageBuilder.setMessage(MessageSerializer.serialize(message))
|
||||
case Right(exception) =>
|
||||
case Left(exception) =>
|
||||
messageBuilder.setException(ExceptionProtocol.newBuilder
|
||||
.setClassname(exception.getClass.getName)
|
||||
.setMessage(empty(exception.getMessage))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue