From 00840c8f5aa450e34a1a0c2576732c0290bc8ea2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 3 Jan 2011 14:44:15 +0100 Subject: [PATCH] Adding support for non-delivery notifications on server-side as well + more code cleanup --- .../remoteinterface/RemoteInterface.scala | 4 + .../remote/netty/NettyRemoteSupport.scala | 78 +++++++++---------- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 891c930ef5..c09c5c873b 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -99,6 +99,10 @@ case class RemoteServerClientDisconnected( case class RemoteServerClientClosed( @BeanProperty val server: RemoteServerModule, @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerWriteFailed( + @BeanProperty request: AnyRef, + @BeanProperty cause: Throwable, + @BeanProperty client: RemoteServerModule, remoteAddress: InetSocketAddress) extends RemoteServerLifeCycleEvent /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b7a4089fca..98bf270ab5 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -511,7 +511,6 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with val home = this.address if (host == home.getHostName && port == home.getPort) {//TODO: switch to InetSocketAddress.equals? val localRef = findActorByIdOrUuid(serviceId,serviceId) - if (localRef ne null) return localRef //Code significantly simpler with the return statement } } @@ -804,14 +803,29 @@ class RemoteServerHandler( val applicationLoader: Option[ClassLoader], val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging { import RemoteServerSettings._ - - val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val CHANNEL_INIT = "channel-init".intern + applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY + val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]() val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]() - applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY + //Writes the specified message to the specified channel and propagates write errors to listeners + private def write(channel: Channel, message: AnyRef): Unit = + channel.write(message).addListener( + new ChannelFutureListener { + def operationComplete(future: ChannelFuture): Unit = { + if (future.isCancelled) { + //Not interesting at the moment + } else if (!future.isSuccess) { + val socketAddress = future.getChannel.getRemoteAddress match { + case i: InetSocketAddress => i + case _ => null + } + server.notifyListeners(RemoteServerWriteFailed(message, future.getCause, server, socketAddress)) + } + } + }) /** * ChannelOpen overridden to store open channels for a clean postStop of a node. @@ -842,25 +856,19 @@ class RemoteServerHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + import scala.collection.JavaConversions.asScalaIterable val clientAddress = getClientAddress(ctx) log.slf4j.debug("Remote client [{}] disconnected from [{}]", clientAddress, server.name) - // stop all session actors - val channelActors = sessionActors.remove(event.getChannel) - if (channelActors ne null) { - val elems = channelActors.elements - while (elems.hasMoreElements) { - val actor = elems.nextElement - try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)} - } - } - val channelTypedActors = typedSessionActors.remove(event.getChannel) - if (channelTypedActors ne null) { - val elems = channelTypedActors.elements - while (elems.hasMoreElements) { - val actor = elems.nextElement - try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e)} - } + // stop all session actors + for (map <- Option(sessionActors.remove(event.getChannel)); + actor <- asScalaIterable(map.values)) { + try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) } + } + // stop all typed session actors + for (map <- Option(typedSessionActors.remove(event.getChannel)); + actor <- asScalaIterable(map.values)) { + try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) } } server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) @@ -914,11 +922,9 @@ class RemoteServerHandler( log.slf4j.debug("Dispatching to remote actor [{}:{}]", actorInfo.getTarget, actorInfo.getUuid) val actorRef = - try { - createActor(actorInfo, channel).start - } catch { + try { createActor(actorInfo, channel).start } catch { case e: SecurityException => - channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor)) + write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor)) server.notifyListeners(RemoteServerError(e, server)) return } @@ -949,13 +955,7 @@ class RemoteServerHandler( if (exception.isDefined) { log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass) - try { - channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) - } catch { - case e: Throwable => - log.slf4j.debug("An error occurred in sending the reply",e) - server.notifyListeners(RemoteServerError(e, server)) - } + write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) } else if (result.isDefined) { log.slf4j.debug("Returning result from actor invocation [{}]",result.get) @@ -975,11 +975,7 @@ class RemoteServerHandler( // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - try { - channel.write(messageBuilder.build) - } catch { - case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) - } + write(channel, messageBuilder.build) } } ) @@ -1015,7 +1011,8 @@ class RemoteServerHandler( AkkaActorType.TypedActor, None) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(messageBuilder.build) + + write(channel, messageBuilder.build) log.slf4j.debug("Returning result from remote typed actor invocation [{}]", result) } catch { case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) @@ -1024,7 +1021,8 @@ class RemoteServerHandler( messageReceiver.invoke(typedActor, args: _*) match { case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed f.onComplete( future => { - val result: Either[Any,Throwable] = if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) + val result: Either[Any,Throwable] = + if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get) sendResponse(result) }) case other => sendResponse(Left(other)) @@ -1032,10 +1030,10 @@ class RemoteServerHandler( } } catch { case e: InvocationTargetException => - channel.write(createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor)) + write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) case e: Throwable => - channel.write(createErrorReplyMessage(e, request, AkkaActorType.TypedActor)) + write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor)) server.notifyListeners(RemoteServerError(e, server)) } }