Adding support for non-delivery notifications on server-side as well + more code cleanup

This commit is contained in:
Viktor Klang 2011-01-03 14:44:15 +01:00
parent 7a0e8a82de
commit 00840c8f5a
2 changed files with 42 additions and 40 deletions

View file

@ -99,6 +99,10 @@ case class RemoteServerClientDisconnected(
case class RemoteServerClientClosed( case class RemoteServerClientClosed(
@BeanProperty val server: RemoteServerModule, @BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerWriteFailed(
@BeanProperty request: AnyRef,
@BeanProperty cause: Throwable,
@BeanProperty client: RemoteServerModule, remoteAddress: InetSocketAddress) extends RemoteServerLifeCycleEvent
/** /**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.

View file

@ -511,7 +511,6 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
val home = this.address val home = this.address
if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddress.equals? if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddress.equals?
val localRef = findActorByIdOrUuid(serviceId,serviceId) val localRef = findActorByIdOrUuid(serviceId,serviceId)
if (localRef ne null) return localRef //Code significantly simpler with the return statement if (localRef ne null) return localRef //Code significantly simpler with the return statement
} }
} }
@ -804,14 +803,29 @@ class RemoteServerHandler(
val applicationLoader: Option[ClassLoader], val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging { val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging {
import RemoteServerSettings._ import RemoteServerSettings._
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
val CHANNEL_INIT = "channel-init".intern val CHANNEL_INIT = "channel-init".intern
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]() val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]()
val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]() val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]()
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY //Writes the specified message to the specified channel and propagates write errors to listeners
private def write(channel: Channel, message: AnyRef): Unit =
channel.write(message).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
val socketAddress = future.getChannel.getRemoteAddress match {
case i: InetSocketAddress => i
case _ => null
}
server.notifyListeners(RemoteServerWriteFailed(message, future.getCause, server, socketAddress))
}
}
})
/** /**
* ChannelOpen overridden to store open channels for a clean postStop of a node. * ChannelOpen overridden to store open channels for a clean postStop of a node.
@ -842,25 +856,19 @@ class RemoteServerHandler(
} }
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
import scala.collection.JavaConversions.asScalaIterable
val clientAddress = getClientAddress(ctx) val clientAddress = getClientAddress(ctx)
log.slf4j.debug("Remote client [{}] disconnected from [{}]", clientAddress, server.name) log.slf4j.debug("Remote client [{}] disconnected from [{}]", clientAddress, server.name)
// stop all session actors
val channelActors = sessionActors.remove(event.getChannel)
if (channelActors ne null) {
val elems = channelActors.elements
while (elems.hasMoreElements) {
val actor = elems.nextElement
try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)}
}
}
val channelTypedActors = typedSessionActors.remove(event.getChannel) // stop all session actors
if (channelTypedActors ne null) { for (map <- Option(sessionActors.remove(event.getChannel));
val elems = channelTypedActors.elements actor <- asScalaIterable(map.values)) {
while (elems.hasMoreElements) { try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) }
val actor = elems.nextElement }
try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)} // stop all typed session actors
} for (map <- Option(typedSessionActors.remove(event.getChannel));
actor <- asScalaIterable(map.values)) {
try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) }
} }
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
@ -914,11 +922,9 @@ class RemoteServerHandler(
log.slf4j.debug("Dispatching to remote actor [{}:{}]", actorInfo.getTarget, actorInfo.getUuid) log.slf4j.debug("Dispatching to remote actor [{}:{}]", actorInfo.getTarget, actorInfo.getUuid)
val actorRef = val actorRef =
try { try { createActor(actorInfo, channel).start } catch {
createActor(actorInfo, channel).start
} catch {
case e: SecurityException => case e: SecurityException =>
channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor)) write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
return return
} }
@ -949,13 +955,7 @@ class RemoteServerHandler(
if (exception.isDefined) { if (exception.isDefined) {
log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass) log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass)
try { write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
} catch {
case e: Throwable =>
log.slf4j.debug("An error occurred in sending the reply",e)
server.notifyListeners(RemoteServerError(e, server))
}
} }
else if (result.isDefined) { else if (result.isDefined) {
log.slf4j.debug("Returning result from actor invocation [{}]",result.get) log.slf4j.debug("Returning result from actor invocation [{}]",result.get)
@ -975,11 +975,7 @@ class RemoteServerHandler(
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
try { write(channel, messageBuilder.build)
channel.write(messageBuilder.build)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
} }
} }
) )
@ -1015,7 +1011,8 @@ class RemoteServerHandler(
AkkaActorType.TypedActor, AkkaActorType.TypedActor,
None) None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(messageBuilder.build)
write(channel, messageBuilder.build)
log.slf4j.debug("Returning result from remote typed actor invocation [{}]", result) log.slf4j.debug("Returning result from remote typed actor invocation [{}]", result)
} catch { } catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
@ -1024,7 +1021,8 @@ class RemoteServerHandler(
messageReceiver.invoke(typedActor, args: _*) match { messageReceiver.invoke(typedActor, args: _*) match {
case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed
f.onComplete( future => { f.onComplete( future => {
val result: Either[Any,Throwable] = if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) val result: Either[Any,Throwable] =
if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get)
sendResponse(result) sendResponse(result)
}) })
case other => sendResponse(Left(other)) case other => sendResponse(Left(other))
@ -1032,10 +1030,10 @@ class RemoteServerHandler(
} }
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>
channel.write(createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor)) write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
case e: Throwable => case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, AkkaActorType.TypedActor)) write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
} }
} }