Merging in Actor per Session + fixing blocking problem with remote typed actors with Future response types

This commit is contained in:
Viktor Klang 2010-11-22 11:57:17 +01:00
parent b6c66986f9
commit 47246b2a27

View file

@ -326,32 +326,25 @@ class RemoteServer extends Logging with ListenerManagement {
} }
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
// TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here if (_isRunning) {
if (_isRunning && !registry.contains(id)) { registry.put(id, actorRef) //TODO change to putIfAbsent
registry.put(id, actorRef);
if (!actorRef.isRunning) actorRef.start if (!actorRef.isRunning) actorRef.start
} }
} }
private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) { private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) {
// TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here if (_isRunning)
if (_isRunning && !registry.contains(id)) { registry.put(id, factory) //TODO change to putIfAbsent
registry.put(id, factory)
}
} }
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) { private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
// TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here if (_isRunning)
if (_isRunning && !registry.contains(id)) { registry.put(id, typedActor) //TODO change to putIfAbsent
registry.put(id, typedActor)
}
} }
private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) { private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) {
// TODO: contains is an alias for containsValue, not containsKey, so the contains is useless here if (_isRunning)
if (_isRunning && !registry.contains(id)) { registry.put(id, factory) //TODO change to putIfAbsent
registry.put(id, factory)
}
} }
/** /**
@ -531,18 +524,21 @@ class RemoteServerHandler(
val clientAddress = getClientAddress(ctx) val clientAddress = getClientAddress(ctx)
log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name) log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name)
// stop all session actors // stop all session actors
val channelActors = sessionActors.remove(event.getChannel()) val channelActors = sessionActors.remove(event.getChannel)
val channelActorsIterator = channelActors.elements() if (channelActors ne null) {
while (channelActorsIterator.hasMoreElements()) { val channelActorsIterator = channelActors.elements
channelActorsIterator.nextElement().stop() while (channelActorsIterator.hasMoreElements) {
channelActorsIterator.nextElement.stop
}
} }
val channelTypedActors = typedSessionActors.remove(event.getChannel()) val channelTypedActors = typedSessionActors.remove(event.getChannel)
val channelTypedActorsIterator = channelTypedActors.elements() if (channelTypedActors ne null) {
while (channelTypedActorsIterator.hasMoreElements()) { val channelTypedActorsIterator = channelTypedActors.elements
TypedActor.stop(channelTypedActorsIterator.nextElement()) while (channelTypedActorsIterator.hasMoreElements) {
TypedActor.stop(channelTypedActorsIterator.nextElement)
}
} }
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
} }
@ -629,7 +625,7 @@ class RemoteServerHandler(
val exception = f.exception val exception = f.exception
if (exception.isDefined) { if (exception.isDefined) {
log.debug("Returning exception from actor invocation [%s]".format(exception.get)) log.debug("Returning exception from actor invocation [%s]",exception.get)
try { try {
channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
} catch { } catch {
@ -679,19 +675,15 @@ class RemoteServerHandler(
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*) val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
else { else {
val result = messageReceiver.invoke(typedActor, args: _*) match { //Sends the response
case f: Future[_] => f.await.result.get //TODO replace this with a Listener on the Future to avoid blocking def sendResponse(result: Either[Any,Throwable]): Unit = try {
case other => other
}
log.debug("Returning result from remote typed actor invocation [%s]", result)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None, None,
Right(request.getUuid), Right(request.getUuid),
actorInfo.getId, actorInfo.getId,
actorInfo.getTarget, actorInfo.getTarget,
actorInfo.getTimeout, actorInfo.getTimeout,
Left(result), result,
true, true,
None, None,
None, None,
@ -699,6 +691,19 @@ class RemoteServerHandler(
None) None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(messageBuilder.build) channel.write(messageBuilder.build)
log.debug("Returning result from remote typed actor invocation [%s]", result)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
messageReceiver.invoke(typedActor, args: _*) match {
case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed
f.onComplete( future => {
val result: Either[Any,Throwable] = if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get)
sendResponse(result)
})
case other => sendResponse(Left(other))
}
} }
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>