From 2867a45e22a9e472aed638b4b98abbdea97d7340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Mar 2011 12:28:01 +0100 Subject: [PATCH 01/11] Added a remote event handler that pipes remote server and client events to the standard EventHandler system --- .../akka/remote/RemoteEventHandler.scala | 46 +++++++++++++++++++ .../remote/netty/NettyRemoteSupport.scala | 33 ++++++------- 2 files changed, 60 insertions(+), 19 deletions(-) create mode 100644 akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala diff --git a/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala b/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala new file mode 100644 index 0000000000..6d3535330d --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.remote + +import akka.actor.Actor +import akka.event.EventHandler +import akka.remoteinterface._ + +/** + * Remote client and server event listener that pipes the events to the standard Akka EventHander. + * + * @author Jonas Bonér + */ +class RemoteEventHandler extends Actor { + import EventHandler._ + println("((((((((((((((( REMOTE EVENT") + + self.id = ID + self.dispatcher = EventHandlerDispatcher + + def receive = { + + // client + case RemoteClientError(cause, client, address) => EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString) + case RemoteClientWriteFailed(request, cause, client, address) => EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(address.toString)) + case RemoteClientDisconnected(client, address) => EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString) + case RemoteClientConnected(client, address) => EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString) + case RemoteClientStarted(client, address) => EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString) + case RemoteClientShutdown(client, address) => EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString) + + // server + case RemoteServerError(cause, server) => EventHandler.error(cause, server, "RemoteServerError") + case RemoteServerWriteFailed(request, cause, server, clientAddress) => EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString)) + case RemoteServerStarted(server) => EventHandler.info(server, "RemoteServerStarted") + case RemoteServerShutdown(server) => EventHandler.info(server, "RemoteServerShutdown") + case RemoteServerClientConnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString) + case RemoteServerClientDisconnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString) + case RemoteServerClientClosed(server, clientAddress) => EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString) + + case _ => //ignore other + } +} + + diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b515b6706f..dabd0c5a3e 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -7,44 +7,36 @@ package akka.remote.netty import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ -import akka.config.ConfigurationException import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization._ -import akka.japi.Creator -import akka.config.Config._ import akka.remoteinterface._ -import akka.actor.{PoisonPill, Index, - ActorInitializationException, LocalActorRef, newUuid, - ActorRegistry, Actor, RemoteActorRef, +import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} -import akka.AkkaException -import akka.event.EventHandler import akka.actor.Actor._ import akka.util._ -import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import akka.event.EventHandler import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture} import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap} +import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap} import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } -import org.jboss.netty.handler.ssl.SslHandler -import scala.collection.mutable.{ HashMap } -import scala.reflect.BeanProperty +import scala.collection.mutable.HashMap -import java.net.{ SocketAddress, InetSocketAddress } +import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } -import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean} +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } +import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} +import akka.remote.{RemoteEventHandler, MessageSerializer, RemoteClientSettings, RemoteServerSettings} object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -212,8 +204,9 @@ abstract class RemoteClient private[akka] ( * Sends the message across the wire */ def send[T]( - request: RemoteMessageProtocol, - senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + request: RemoteMessageProtocol, + senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + if (isRunning) { if (request.getOneWay) { val future = currentChannel.write(RemoteEncoder.encode(request)) @@ -222,7 +215,6 @@ abstract class RemoteClient private[akka] ( notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) throw future.getCause } - None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -497,6 +489,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with //Needed for remote testing and switching on/off under run val optimizeLocal = new AtomicBoolean(true) + // add the remote client and server listener that pipes the events to the event handler system + addListener(Actor.actorOf[RemoteEventHandler].start) + def optimizeLocalScoped_?() = optimizeLocal.get protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = { From dfcbf7501120cd99be8996b541863da71a221959 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Mar 2011 12:48:40 +0100 Subject: [PATCH 02/11] refactored remote event handler and added deregistration of it on remote shutdown --- .../remoteinterface}/RemoteEventHandler.scala | 6 ++---- .../remoteinterface/RemoteInterface.scala | 21 ++++++++++++------- .../remote/netty/NettyRemoteSupport.scala | 12 +++++------ 3 files changed, 22 insertions(+), 17 deletions(-) rename {akka-remote/src/main/scala/akka/remote => akka-actor/src/main/scala/akka/remoteinterface}/RemoteEventHandler.scala (96%) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala similarity index 96% rename from akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala rename to akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala index 6d3535330d..c3ad4d9c79 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala @@ -1,12 +1,11 @@ +package akka.remoteinterface + /** * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.remote - import akka.actor.Actor import akka.event.EventHandler -import akka.remoteinterface._ /** * Remote client and server event listener that pipes the events to the standard Akka EventHander. @@ -15,7 +14,6 @@ import akka.remoteinterface._ */ class RemoteEventHandler extends Actor { import EventHandler._ - println("((((((((((((((( REMOTE EVENT") self.id = ID self.dispatcher = EventHandlerDispatcher diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 62dcc422ee..3fd26bbfbb 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -5,14 +5,15 @@ package akka.remoteinterface import akka.japi.Creator -import java.net.InetSocketAddress import akka.actor._ import akka.util._ import akka.dispatch.CompletableFuture -import akka.config.Config.{config, TIME_UNIT} -import java.util.concurrent.ConcurrentHashMap import akka.AkkaException -import reflect.BeanProperty + +import scala.reflect.BeanProperty + +import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentHashMap trait RemoteModule { val UUID_PREFIX = "uuid:" @@ -20,7 +21,6 @@ trait RemoteModule { def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope protected[akka] def notifyListeners(message: => Any): Unit - private[akka] def actors: ConcurrentHashMap[String, ActorRef] private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef] @@ -28,7 +28,6 @@ trait RemoteModule { private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef] private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef] - /** Lookup methods **/ private[akka] def findActorById(id: String) : ActorRef = actors.get(id) @@ -126,13 +125,21 @@ case class UnparsableException private[akka] (originalClassName: String, abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { + + lazy val eventHandler: ActorRef = { + val handler = Actor.actorOf[RemoteEventHandler].start + // add the remote client and server listener that pipes the events to the event handler system + addListener(handler) + handler + } + def shutdown { + removeListener(eventHandler) this.shutdownClientModule this.shutdownServerModule clear } - /** * Creates a Client-managed ActorRef out of the Actor of the specified Class. * If the supplied host and port is identical of the configured local node, it will be a local actor diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index dabd0c5a3e..2f4579cc36 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -36,7 +36,8 @@ import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} -import akka.remote.{RemoteEventHandler, MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import akka.remoteinterface.{RemoteEventHandler} +import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -209,12 +210,13 @@ abstract class RemoteClient private[akka] ( if (isRunning) { if (request.getOneWay) { + txLog.add(request) val future = currentChannel.write(RemoteEncoder.encode(request)) future.awaitUninterruptibly() if (!future.isCancelled && !future.isSuccess) { notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) throw future.getCause - } + } else None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -486,11 +488,9 @@ class ActiveRemoteClientHandler( * Provides the implementation of the Netty remote support */ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule { - //Needed for remote testing and switching on/off under run - val optimizeLocal = new AtomicBoolean(true) - // add the remote client and server listener that pipes the events to the event handler system - addListener(Actor.actorOf[RemoteEventHandler].start) + // Needed for remote testing and switching on/off under run + val optimizeLocal = new AtomicBoolean(true) def optimizeLocalScoped_?() = optimizeLocal.get From cd93f5750fa83d87f8be95812ae7ae78415f64d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Mar 2011 16:23:37 +0100 Subject: [PATCH 03/11] 1. Added a 'pending-messages' tx log for all pending messages that are not yet delivered to the remote host, this tx log is retried upon successful remote client reconnect. 2. Fixed broken code in UnparsableException and renamed it to UnknownRemoteException. --- .../src/main/scala/akka/Implicits.scala | 5 +- .../remoteinterface/RemoteInterface.scala | 20 +++-- .../remote/netty/NettyRemoteSupport.scala | 82 +++++++++++-------- 3 files changed, 67 insertions(+), 40 deletions(-) diff --git a/akka-actor/src/main/scala/akka/Implicits.scala b/akka-actor/src/main/scala/akka/Implicits.scala index 6370e1c2fd..0a781649eb 100644 --- a/akka-actor/src/main/scala/akka/Implicits.scala +++ b/akka-actor/src/main/scala/akka/Implicits.scala @@ -14,7 +14,10 @@ package object actor { ref.asInstanceOf[ActorRef] type Uuid = com.eaio.uuid.UUID + def newUuid(): Uuid = new Uuid() - def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode) + + def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode) + def uuidFrom(uuid: String): Uuid = new Uuid(uuid) } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 3fd26bbfbb..b56ed67277 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -14,6 +14,7 @@ import scala.reflect.BeanProperty import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap +import java.io.{PrintWriter, PrintStream} trait RemoteModule { val UUID_PREFIX = "uuid:" @@ -113,16 +114,21 @@ case class RemoteServerWriteFailed( /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. */ -class RemoteClientException private[akka] (message: String, - @BeanProperty val client: RemoteClientModule, - val remoteAddress: InetSocketAddress) extends AkkaException(message) +class RemoteClientException private[akka] ( + message: String, + @BeanProperty val client: RemoteClientModule, + val remoteAddress: InetSocketAddress) extends AkkaException(message) /** - * Returned when a remote exception cannot be instantiated or parsed + * Returned when a remote exception sent over the wire cannot be loaded and instantiated */ -case class UnparsableException private[akka] (originalClassName: String, - originalMessage: String) extends AkkaException(originalMessage) - +case class UnknownRemoteException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) + extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]" + .format(cause.toString, originalClassName, originalMessage)) { + override def printStackTrace = cause.printStackTrace + override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream) + override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter) +} abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 2f4579cc36..129b1c4f48 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -34,10 +34,10 @@ import scala.collection.mutable.HashMap import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import akka.remoteinterface.{RemoteEventHandler} import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import java.util.concurrent._ object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -157,19 +157,25 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort + val name = this.getClass.getSimpleName + "@" + + remoteAddress.getAddress.getHostAddress + "::" + + remoteAddress.getPort - protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - private[remote] val runSwitch = new Switch() + protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] + protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + + private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) private[remote] def isRunning = runSwitch.isOn protected def notifyListeners(msg: => Any); Unit + protected def currentChannel: Channel def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean + def shutdown: Boolean /** @@ -207,36 +213,19 @@ abstract class RemoteClient private[akka] ( def send[T]( request: RemoteMessageProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { - if (isRunning) { if (request.getOneWay) { - txLog.add(request) - val future = currentChannel.write(RemoteEncoder.encode(request)) - future.awaitUninterruptibly() - if (!future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - throw future.getCause - } else + pendingMessages.add((true, null, request)) + sendPendingMessages() None } else { - val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) - futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - futures.remove(futureUuid) //Clean this up - //We don't care about that right now - } else if (!future.isSuccess) { - val f = futures.remove(futureUuid) //Clean this up - if (f ne null) - f.completeWithException(future.getCause) - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) - Some(futureResult) + val futureResult = if (senderFuture.isDefined) senderFuture.get + else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) + futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails + pendingMessages.add((false, futureUuid, request)) + sendPendingMessages() + Some(futureResult) } } else { val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) @@ -245,6 +234,32 @@ abstract class RemoteClient private[akka] ( } } + private[remote] def sendPendingMessages() = { + var pendingMessage = pendingMessages.poll + while (pendingMessage ne null) { + val (isOneWay, futureUuid, message) = pendingMessage + if (isOneWay) { + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + throw future.getCause + } + } else { + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (future.isCancelled) { + futures.remove(futureUuid) // Clean up future + } else if (!future.isSuccess) { + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + } + } + pendingMessage = pendingMessages.poll + } + } + private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( "Can't register supervisor for " + actorRef + " since it is not under supervision") @@ -265,6 +280,7 @@ class ActiveRemoteClient private[akka] ( module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { import RemoteClientSettings._ + //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @volatile private[remote] var connection: ChannelFuture = _ @@ -335,6 +351,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources bootstrap = null connection = null + pendingMessages.clear } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -449,6 +466,7 @@ class ActiveRemoteClientHandler( } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.sendPendingMessages() // try to send pending message (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) client.resetReconnectionTimeWindow } @@ -479,7 +497,7 @@ class ActiveRemoteClientHandler( } catch { case problem: Throwable => EventHandler.error(problem, this, problem.getMessage) - UnparsableException(classname, exception.getMessage) + UnknownRemoteException(problem, classname, exception.getMessage) } } } From b4256431cddc0a70f2845925366e883dc5029b17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Mar 2011 16:23:37 +0100 Subject: [PATCH 04/11] 1. Added a 'pending-messages' tx log for all pending messages that are not yet delivered to the remote host, this tx log is retried upon successful remote client reconnect. 2. Fixed broken code in UnparsableException and renamed it to UnknownRemoteException. --- .../src/main/scala/akka/Implicits.scala | 5 +- .../remoteinterface/RemoteInterface.scala | 20 +++-- .../remote/netty/NettyRemoteSupport.scala | 83 ++++++++++++------- 3 files changed, 68 insertions(+), 40 deletions(-) diff --git a/akka-actor/src/main/scala/akka/Implicits.scala b/akka-actor/src/main/scala/akka/Implicits.scala index 6370e1c2fd..0a781649eb 100644 --- a/akka-actor/src/main/scala/akka/Implicits.scala +++ b/akka-actor/src/main/scala/akka/Implicits.scala @@ -14,7 +14,10 @@ package object actor { ref.asInstanceOf[ActorRef] type Uuid = com.eaio.uuid.UUID + def newUuid(): Uuid = new Uuid() - def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode) + + def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode) + def uuidFrom(uuid: String): Uuid = new Uuid(uuid) } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 3fd26bbfbb..b56ed67277 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -14,6 +14,7 @@ import scala.reflect.BeanProperty import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap +import java.io.{PrintWriter, PrintStream} trait RemoteModule { val UUID_PREFIX = "uuid:" @@ -113,16 +114,21 @@ case class RemoteServerWriteFailed( /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. */ -class RemoteClientException private[akka] (message: String, - @BeanProperty val client: RemoteClientModule, - val remoteAddress: InetSocketAddress) extends AkkaException(message) +class RemoteClientException private[akka] ( + message: String, + @BeanProperty val client: RemoteClientModule, + val remoteAddress: InetSocketAddress) extends AkkaException(message) /** - * Returned when a remote exception cannot be instantiated or parsed + * Returned when a remote exception sent over the wire cannot be loaded and instantiated */ -case class UnparsableException private[akka] (originalClassName: String, - originalMessage: String) extends AkkaException(originalMessage) - +case class UnknownRemoteException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) + extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]" + .format(cause.toString, originalClassName, originalMessage)) { + override def printStackTrace = cause.printStackTrace + override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream) + override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter) +} abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 2f4579cc36..ba83c58c0c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -34,10 +34,10 @@ import scala.collection.mutable.HashMap import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import akka.remoteinterface.{RemoteEventHandler} import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import java.util.concurrent._ object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -157,19 +157,25 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort + val name = this.getClass.getSimpleName + "@" + + remoteAddress.getAddress.getHostAddress + "::" + + remoteAddress.getPort - protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - private[remote] val runSwitch = new Switch() + protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] + protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + + private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) private[remote] def isRunning = runSwitch.isOn protected def notifyListeners(msg: => Any); Unit + protected def currentChannel: Channel def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean + def shutdown: Boolean /** @@ -207,36 +213,19 @@ abstract class RemoteClient private[akka] ( def send[T]( request: RemoteMessageProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { - if (isRunning) { if (request.getOneWay) { - txLog.add(request) - val future = currentChannel.write(RemoteEncoder.encode(request)) - future.awaitUninterruptibly() - if (!future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - throw future.getCause - } else + pendingMessages.add((true, null, request)) + sendPendingMessages() None } else { - val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) - futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - futures.remove(futureUuid) //Clean this up - //We don't care about that right now - } else if (!future.isSuccess) { - val f = futures.remove(futureUuid) //Clean this up - if (f ne null) - f.completeWithException(future.getCause) - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) - Some(futureResult) + val futureResult = if (senderFuture.isDefined) senderFuture.get + else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) + futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails + pendingMessages.add((false, futureUuid, request)) + sendPendingMessages() + Some(futureResult) } } else { val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) @@ -245,6 +234,33 @@ abstract class RemoteClient private[akka] ( } } + private[remote] def sendPendingMessages() = { + var pendingMessage = pendingMessages.peek // try to grab first message + while (pendingMessage ne null) { + val (isOneWay, futureUuid, message) = pendingMessage + if (isOneWay) { // sendOneWay + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + throw future.getCause + } + } else { // sendRequestReply + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (future.isCancelled) { + futures.remove(futureUuid) // Clean up future + } else if (!future.isSuccess) { + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + } + } + pendingMessages.remove(pendingMessage) // message delivered; remove from tx log + pendingMessage = pendingMessages.peek // try to grab next message + } + } + private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( "Can't register supervisor for " + actorRef + " since it is not under supervision") @@ -265,6 +281,7 @@ class ActiveRemoteClient private[akka] ( module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { import RemoteClientSettings._ + //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @volatile private[remote] var connection: ChannelFuture = _ @@ -335,6 +352,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources bootstrap = null connection = null + pendingMessages.clear } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -449,6 +467,7 @@ class ActiveRemoteClientHandler( } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.sendPendingMessages() // try to send pending message (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) client.resetReconnectionTimeWindow } @@ -479,7 +498,7 @@ class ActiveRemoteClientHandler( } catch { case problem: Throwable => EventHandler.error(problem, this, problem.getMessage) - UnparsableException(classname, exception.getMessage) + UnknownRemoteException(problem, classname, exception.getMessage) } } } From 259ecf8a8b20c244c1952650b76a1925fb48459f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 Mar 2011 08:36:43 +0100 Subject: [PATCH 05/11] Added accessor for pending messages --- .../remote/netty/NettyRemoteSupport.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index ba83c58c0c..a51022c95f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -5,6 +5,7 @@ package akka.remote.netty import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} +import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.serialization.RemoteActorSerialization @@ -31,12 +32,11 @@ import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} -import akka.remoteinterface.{RemoteEventHandler} -import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import java.util.concurrent._ object RemoteEncoder { @@ -163,7 +163,7 @@ abstract class RemoteClient private[akka] ( protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) @@ -178,6 +178,13 @@ abstract class RemoteClient private[akka] ( def shutdown: Boolean + /** + * Returns an array with the current pending messages not yet delivered. + */ + def pendingMessages: Array[Any] = pendingRequests + .toArray.asInstanceOf[Array[(Boolean, Uuid, RemoteMessageProtocol)]] + .map(req => MessageSerializer.deserialize(req._3.getMessage)) + /** * Converts the message to the wireprotocol and sends the message across the wire */ @@ -215,7 +222,7 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { if (isRunning) { if (request.getOneWay) { - pendingMessages.add((true, null, request)) + pendingRequests.add((true, null, request)) sendPendingMessages() None } else { @@ -223,7 +230,7 @@ abstract class RemoteClient private[akka] ( else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails - pendingMessages.add((false, futureUuid, request)) + pendingRequests.add((false, futureUuid, request)) sendPendingMessages() Some(futureResult) } @@ -235,7 +242,7 @@ abstract class RemoteClient private[akka] ( } private[remote] def sendPendingMessages() = { - var pendingMessage = pendingMessages.peek // try to grab first message + var pendingMessage = pendingRequests.peek // try to grab first message while (pendingMessage ne null) { val (isOneWay, futureUuid, message) = pendingMessage if (isOneWay) { // sendOneWay @@ -256,8 +263,8 @@ abstract class RemoteClient private[akka] ( notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) } } - pendingMessages.remove(pendingMessage) // message delivered; remove from tx log - pendingMessage = pendingMessages.peek // try to grab next message + pendingRequests.remove(pendingMessage) // message delivered; remove from tx log + pendingMessage = pendingRequests.peek // try to grab next message } } @@ -352,7 +359,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources bootstrap = null connection = null - pendingMessages.clear + pendingRequests.clear } private[akka] def isWithinReconnectionTimeWindow: Boolean = { From 3f719ef6107e5d188df5c88baa7fc65207685d86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 Mar 2011 16:06:55 +0100 Subject: [PATCH 06/11] 1. Fixed issues with remote message tx log. 2. Added trait for network failure testing that supports 'TCP RST', 'TCP DENY' and message throttling/delay. 3. Added test for the remote transaction log. Both for TCP RST and TCP DENY. --- .../remote/netty/NettyRemoteSupport.scala | 79 +++++++++--- .../test/scala/remote/AkkaRemoteTest.scala | 89 ++++++++++++- .../remote/RemoteErrorHandlingTest.scala | 117 ++++++++++++++++++ project/build/AkkaProject.scala | 1 + 4 files changed, 265 insertions(+), 21 deletions(-) create mode 100644 akka-remote/src/test/scala/remote/RemoteErrorHandlingTest.scala diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index a51022c95f..f3f14ca5e1 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -181,9 +181,15 @@ abstract class RemoteClient private[akka] ( /** * Returns an array with the current pending messages not yet delivered. */ - def pendingMessages: Array[Any] = pendingRequests - .toArray.asInstanceOf[Array[(Boolean, Uuid, RemoteMessageProtocol)]] - .map(req => MessageSerializer.deserialize(req._3.getMessage)) + def pendingMessages: Array[Any] = { + var messages = Vector[Any]() + val iter = pendingRequests.iterator + while (iter.hasNext) { + val (_, _, message) = iter.next + messages = messages :+ MessageSerializer.deserialize(message.getMessage) + } + messages.toArray + } /** * Converts the message to the wireprotocol and sends the message across the wire @@ -222,16 +228,43 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { if (isRunning) { if (request.getOneWay) { - pendingRequests.add((true, null, request)) - sendPendingMessages() + try { + val future = currentChannel.write(RemoteEncoder.encode(request)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + throw future.getCause + } + } catch { + case e: Throwable => + pendingRequests.add((true, null, request)) // add the request to the tx log after a failing send + notifyListeners(RemoteClientError(e, module, remoteAddress)) + throw e + } None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails - pendingRequests.add((false, futureUuid, request)) - sendPendingMessages() + + def handleRequestReplyError(future: ChannelFuture) = { + pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + } + + var future: ChannelFuture = null + try { + // try to send the original one + future = currentChannel.write(RemoteEncoder.encode(request)) + future.awaitUninterruptibly() + if (future.isCancelled) futures.remove(futureUuid) // Clean up future + else if (!future.isSuccess) handleRequestReplyError(future) + } catch { + case e: Exception => handleRequestReplyError(future) + } Some(futureResult) } } else { @@ -241,10 +274,12 @@ abstract class RemoteClient private[akka] ( } } - private[remote] def sendPendingMessages() = { - var pendingMessage = pendingRequests.peek // try to grab first message - while (pendingMessage ne null) { - val (isOneWay, futureUuid, message) = pendingMessage + private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log + val nrOfMessages = pendingRequests.size + if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages) + var pendingRequest = pendingRequests.peek + while (pendingRequest ne null) { + val (isOneWay, futureUuid, message) = pendingRequest if (isOneWay) { // sendOneWay val future = currentChannel.write(RemoteEncoder.encode(message)) future.awaitUninterruptibly() @@ -255,16 +290,15 @@ abstract class RemoteClient private[akka] ( } else { // sendRequestReply val future = currentChannel.write(RemoteEncoder.encode(message)) future.awaitUninterruptibly() - if (future.isCancelled) { - futures.remove(futureUuid) // Clean up future - } else if (!future.isSuccess) { + if (future.isCancelled) futures.remove(futureUuid) // Clean up future + else if (!future.isSuccess) { val f = futures.remove(futureUuid) // Clean up future if (f ne null) f.completeWithException(future.getCause) notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) } } - pendingRequests.remove(pendingMessage) // message delivered; remove from tx log - pendingMessage = pendingRequests.peek // try to grab next message + pendingRequests.remove(pendingRequest) + pendingRequest = pendingRequests.peek // try to grab next message } } @@ -474,9 +508,16 @@ class ActiveRemoteClientHandler( } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.sendPendingMessages() // try to send pending message (still there after client/server crash ard reconnect - client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) - client.resetReconnectionTimeWindow + try { + client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect + client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) + client.resetReconnectionTimeWindow + } catch { + case e: Throwable => + EventHandler.error(e, this, e.getMessage) + client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) + throw e + } } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala index 0c7421df0a..19f3313536 100644 --- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala +++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala @@ -1,13 +1,13 @@ package akka.actor.remote -import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith import akka.remote.netty.NettyRemoteSupport import akka.actor. {Actor, ActorRegistry} import java.util.concurrent. {TimeUnit, CountDownLatch} +import org.scalatest.{Spec, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach} +import java.util.concurrent.atomic.AtomicBoolean object AkkaRemoteTest { class ReplyHandlerActor(latch: CountDownLatch, expect: String) extends Actor { @@ -59,4 +59,89 @@ class AkkaRemoteTest extends /* Utilities */ def replyHandler(latch: CountDownLatch, expect: String) = Some(Actor.actorOf(new ReplyHandlerActor(latch, expect)).start) +} + +trait NetworkFailureTest { self: WordSpec => + import akka.actor.Actor._ + import akka.util.Duration + + val BYTES_PER_SECOND = "60KByte/s" + val DELAY_MILLIS = "350ms" + val PORT_RANGE = "1024-65535" + + def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = { + spawn { + try { + enableTcpReset() + println("===>>> Reply with [TCP RST] for [" + duration + "]") + Thread.sleep(duration.toMillis) + restoreIP + } catch { + case e => + dead.set(true) + e.printStackTrace + } + } + } + + def throttleNetworkFor(duration: Duration, dead: AtomicBoolean) = { + spawn { + try { + enableNetworkThrottling() + println("===>>> Throttling network with [" + BYTES_PER_SECOND + ", " + DELAY_MILLIS + "] for [" + duration + "]") + Thread.sleep(duration.toMillis) + restoreIP + } catch { + case e => + dead.set(true) + e.printStackTrace + } + } + } + + def dropNetworkFor(duration: Duration, dead: AtomicBoolean) = { + spawn { + try { + enableNetworkDrop() + println("===>>> Blocking network [TCP DENY] for [" + duration + "]") + Thread.sleep(duration.toMillis) + restoreIP + } catch { + case e => + dead.set(true) + e.printStackTrace + } + } + } + + def sleepFor(duration: Duration) = { + println("===>>> Sleeping for [" + duration + "]") + Thread sleep (duration.toMillis) + } + + def enableNetworkThrottling() = { + restoreIP() + assert(new ProcessBuilder("ipfw", "add", "pipe", "1", "ip", "from", "any", "to", "any").start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "add", "pipe", "2", "ip", "from", "any", "to", "any").start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "pipe", "1", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "pipe", "2", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0) + } + + def enableNetworkDrop() = { + restoreIP() + assert(new ProcessBuilder("ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0) + } + + def enableTcpReset() = { + restoreIP() + assert(new ProcessBuilder("ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0) + } + + def restoreIP() = { + println("===>>> Restoring network") + assert(new ProcessBuilder("ipfw", "del", "pipe", "1").start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "del", "pipe", "2").start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "flush").start.waitFor == 0) + assert(new ProcessBuilder("ipfw", "pipe", "flush").start.waitFor == 0) + } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/remote/RemoteErrorHandlingTest.scala b/akka-remote/src/test/scala/remote/RemoteErrorHandlingTest.scala new file mode 100644 index 0000000000..f3b89172e3 --- /dev/null +++ b/akka-remote/src/test/scala/remote/RemoteErrorHandlingTest.scala @@ -0,0 +1,117 @@ +package akka.actor.remote + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import akka.actor.Actor._ +import akka.actor.{ActorRef, Actor} +import akka.util.duration._ +import java.util.concurrent.atomic.AtomicBoolean + +object RemoteErrorHandlingTest { + case class Send(actor: ActorRef) + + class RemoteActorSpecActorUnidirectional extends Actor { + self.id = "network-drop:unidirectional" + def receive = { + case "Ping" => self.reply_?("Pong") + } + } + + class Decrementer extends Actor { + def receive = { + case "done" => self.reply_?(false) + case i: Int if i > 0 => + self.reply_?(i - 1) + case i: Int => + self.reply_?(0) + this become { + case "done" => self.reply_?(true) + case _ => //Do Nothing + } + } + } + + class RemoteActorSpecActorBidirectional extends Actor { + + def receive = { + case "Hello" => + self.reply("World") + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } + } + + class RemoteActorSpecActorAsyncSender(latch: CountDownLatch) extends Actor { + def receive = { + case Send(actor: ActorRef) => + actor ! "Hello" + case "World" => latch.countDown + } + } +} + +class RemoteErrorHandlingTest extends AkkaRemoteTest with NetworkFailureTest { + import RemoteErrorHandlingTest._ + + "Remote actors" should { + + "be able to recover from network drop without loosing any messages" in { + val latch = new CountDownLatch(10) + implicit val sender = replyHandler(latch, "Pong") + val service = actorOf[RemoteActorSpecActorUnidirectional] + remote.register(service.id, service) + val actor = remote.actorFor(service.id, 5000L, host, port) + actor ! "Ping" + actor ! "Ping" + actor ! "Ping" + actor ! "Ping" + actor ! "Ping" + val dead = new AtomicBoolean(false) + dropNetworkFor (10 seconds, dead) // drops the network - in another thread - so async + sleepFor (2 seconds) // wait until network drop is done before sending the other messages + try { actor ! "Ping" } catch { case e => () } // queue up messages + try { actor ! "Ping" } catch { case e => () } // ... + try { actor ! "Ping" } catch { case e => () } // ... + try { actor ! "Ping" } catch { case e => () } // ... + try { actor ! "Ping" } catch { case e => () } // ... + latch.await(15, TimeUnit.SECONDS) must be (true) // network should be restored and the messages delivered + dead.get must be (false) + } + + "be able to recover from TCP RESET without loosing any messages" in { + val latch = new CountDownLatch(10) + implicit val sender = replyHandler(latch, "Pong") + val service = actorOf[RemoteActorSpecActorUnidirectional] + remote.register(service.id, service) + val actor = remote.actorFor(service.id, 5000L, host, port) + actor ! "Ping" + actor ! "Ping" + actor ! "Ping" + actor ! "Ping" + actor ! "Ping" + val dead = new AtomicBoolean(false) + replyWithTcpResetFor (10 seconds, dead) + sleepFor (2 seconds) + try { actor ! "Ping" } catch { case e => () } // queue up messages + try { actor ! "Ping" } catch { case e => () } // ... + try { actor ! "Ping" } catch { case e => () } // ... + try { actor ! "Ping" } catch { case e => () } // ... + try { actor ! "Ping" } catch { case e => () } // ... + latch.await(15, TimeUnit.SECONDS) must be (true) + dead.get must be (false) + } +/* + "sendWithBangAndGetReplyThroughSenderRef" in { + remote.register(actorOf[RemoteActorSpecActorBidirectional]) + implicit val timeout = 500000000L + val actor = remote.actorFor( + "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, host, port) + val latch = new CountDownLatch(1) + val sender = actorOf( new RemoteActorSpecActorAsyncSender(latch) ).start + sender ! Send(actor) + latch.await(1, TimeUnit.SECONDS) must be (true) + } + */ + } +} + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5a04b66627..4b59c401cb 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -346,6 +346,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val junit = Dependencies.junit val scalatest = Dependencies.scalatest + override def testOptions = createTestFilter( _.endsWith("Spec")) override def bndImportPackage = "javax.transaction;version=1.1" :: super.bndImportPackage.toList } From 604a21d363703a7d098067adf5641c04016231f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 Mar 2011 16:09:44 +0100 Subject: [PATCH 07/11] added script to simulate network failure scenarios and restore original settings --- scripts/ip-mod.sh | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100755 scripts/ip-mod.sh diff --git a/scripts/ip-mod.sh b/scripts/ip-mod.sh new file mode 100755 index 0000000000..e9b509ae59 --- /dev/null +++ b/scripts/ip-mod.sh @@ -0,0 +1,31 @@ +#!/bin/sh + +# flush rules +ipfw del pipe 1 +ipfw del pipe 2 +ipfw -q -f flush +ipfw -q -f pipe flush + +if [ "$1" == "" ]; then + echo "Options: ip-mod.sh slow" + echo " ip-mod.sh block" + echo " ip-mod.sh reset" + echo " ip-mod.sh restore" + exit +elif [ "$1" == "restore" ]; then + echo "restoring normal network" + exit +elif [ "$1" == "slow" ]; then + # simulate slow connection + echo "enabling slow connection" + ipfw add pipe 1 ip from any to any + ipfw add pipe 2 ip from any to any + ipfw pipe 1 config bw 60KByte/s delay 350ms + ipfw pipe 2 config bw 60KByte/s delay 350ms +elif [ "$1" == "block" ]; then + echo "enabling blocked connections" + ipfw add 1 deny tcp from any to any 1024-65535 +elif [ "$1" == "reset" ]; then + echo "enabling reset connections" + ipfw add 1 reset tcp from any to any 1024-65535 +fi From 6d13a390154935ba2c3e45a81ea6a0f1f9e69c10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 Mar 2011 16:25:36 +0100 Subject: [PATCH 08/11] Changed UnknownRemoteException to CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException - should be more clear now. --- .../src/main/scala/akka/remoteinterface/RemoteInterface.scala | 2 +- .../src/main/scala/akka/remote/netty/NettyRemoteSupport.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index b56ed67277..185f0d2799 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -122,7 +122,7 @@ class RemoteClientException private[akka] ( /** * Returned when a remote exception sent over the wire cannot be loaded and instantiated */ -case class UnknownRemoteException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) +case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]" .format(cause.toString, originalClassName, originalMessage)) { override def printStackTrace = cause.printStackTrace diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index f3f14ca5e1..8cc45221cb 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -546,7 +546,7 @@ class ActiveRemoteClientHandler( } catch { case problem: Throwable => EventHandler.error(problem, this, problem.getMessage) - UnknownRemoteException(problem, classname, exception.getMessage) + CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage) } } } From 78faeaa45af785860f96371c069f15dc134f4dc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 27 Mar 2011 22:58:50 +0200 Subject: [PATCH 09/11] 1. Added config option to enable/disable the remote client transaction log for resending failed messages. 2. Swallows exceptions on appending to transaction log and do not complete the Future matching the message. --- .../src/main/scala/akka/actor/ActorRef.scala | 22 ++++++------- .../remote/netty/NettyRemoteSupport.scala | 31 ++++++++++++------- .../test/scala/remote/AkkaRemoteTest.scala | 5 ++- config/akka-reference.conf | 3 +- 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index df29edd650..81574dacff 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -6,18 +6,14 @@ package akka.actor import akka.event.EventHandler import akka.dispatch._ -import akka.config.Config._ import akka.config.Supervision._ -import akka.AkkaException import akka.util._ import ReflectiveAccess._ import java.net.InetSocketAddress -import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} -import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.{ Map => JMap } -import java.lang.reflect.Field import scala.reflect.BeanProperty import scala.collection.immutable.Stack @@ -39,7 +35,7 @@ private[akka] object ActorRefInternals { * Abstraction for unification of sender and senderFuture for later reply */ abstract class Channel[T] { - + /** * Sends the specified message to the channel * Scala API @@ -943,7 +939,7 @@ class LocalActorRef private[akka] ( performRestart true } catch { - case e => + case e => EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) false // an error or exception here should trigger a retry } finally { @@ -1009,7 +1005,7 @@ class LocalActorRef private[akka] ( private def handleExceptionInDispatch(reason: Throwable, message: Any) = { EventHandler.error(reason, this, message.toString) - + //Prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -1121,9 +1117,9 @@ private[akka] case class RemoteActorRef private[akka] ( senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = Actor.remote.send[T]( - message, senderOption, senderFuture, - homeAddress.get, timeout, - false, this, None, + message, senderOption, senderFuture, + homeAddress.get, timeout, + false, this, None, actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) @@ -1201,8 +1197,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => */ def id: String - def id_=(id: String): Unit - + def id_=(id: String): Unit + /** * User overridable callback/setting. *

diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 8cc45221cb..6f39aaf2cc 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,6 +16,7 @@ import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.actor.Actor._ +import akka.config.Config._ import akka.util._ import akka.event.EventHandler @@ -149,14 +150,16 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } /** - * This is the abstract baseclass for netty remote clients, - * currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that + * This is the abstract baseclass for netty remote clients, currently there's only an + * ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that * reuses an already established connection. */ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { + val useTransactionLog = config.getBool("akka.remote.resend-on-failure", true) + val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort @@ -237,9 +240,10 @@ abstract class RemoteClient private[akka] ( } } catch { case e: Throwable => - pendingRequests.add((true, null, request)) // add the request to the tx log after a failing send + // add the request to the tx log after a failing send notifyListeners(RemoteClientError(e, module, remoteAddress)) - throw e + if (useTransactionLog) pendingRequests.add((true, null, request)) + else throw e } None } else { @@ -249,10 +253,13 @@ abstract class RemoteClient private[akka] ( futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails def handleRequestReplyError(future: ChannelFuture) = { - pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send - val f = futures.remove(futureUuid) // Clean up future - if (f ne null) f.completeWithException(future.getCause) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + if (useTransactionLog) { + pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send + } else { + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) + } } var future: ChannelFuture = null @@ -509,7 +516,7 @@ class ActiveRemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { try { - client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect + if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) client.resetReconnectionTimeWindow } catch { @@ -625,7 +632,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, bootstrap.releaseExternalResources serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { - case e: Exception => + case e: Exception => EventHandler.error(e, this, e.getMessage) } } @@ -658,7 +665,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader))) } } catch { - case e: Exception => + case e: Exception => EventHandler.error(e, this, e.getMessage) notifyListeners(RemoteServerError(e, this)) } @@ -1049,7 +1056,7 @@ class RemoteServerHandler( write(channel, RemoteEncoder.encode(messageBuilder.build)) } catch { - case e: Exception => + case e: Exception => EventHandler.error(e, this, e.getMessage) server.notifyListeners(RemoteServerError(e, server)) } @@ -1267,4 +1274,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") } } -} \ No newline at end of file +} diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala index 19f3313536..444c607043 100644 --- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala +++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala @@ -65,10 +65,13 @@ trait NetworkFailureTest { self: WordSpec => import akka.actor.Actor._ import akka.util.Duration + // override is subclass if needed val BYTES_PER_SECOND = "60KByte/s" val DELAY_MILLIS = "350ms" val PORT_RANGE = "1024-65535" + // FIXME add support for TCP FIN by hooking into Netty and do socket.close + def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = { spawn { try { @@ -144,4 +147,4 @@ trait NetworkFailureTest { self: WordSpec => assert(new ProcessBuilder("ipfw", "flush").start.waitFor == 0) assert(new ProcessBuilder("ipfw", "pipe", "flush").start.waitFor == 0) } -} \ No newline at end of file +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 342f8e6316..a2d78c4d3e 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -11,7 +11,7 @@ akka { enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] time-unit = "seconds" # Time unit for all timeout properties throughout the config - + event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG @@ -146,6 +146,7 @@ akka { } client { + resend-on-failure = true reconnect-delay = 5 read-timeout = 10 message-frame-size = 1048576 From 6c5f6305b9f342c4371d2ee5d8f7aa38ca321359 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Mon, 28 Mar 2011 14:13:58 +1300 Subject: [PATCH 10/11] Add system property to enable network failure tests --- ...ingTest.scala => RemoteErrorHandlingNetworkTest.scala} | 6 +++--- project/build/AkkaProject.scala | 8 +++++++- 2 files changed, 10 insertions(+), 4 deletions(-) rename akka-remote/src/test/scala/remote/{RemoteErrorHandlingTest.scala => RemoteErrorHandlingNetworkTest.scala} (95%) diff --git a/akka-remote/src/test/scala/remote/RemoteErrorHandlingTest.scala b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala similarity index 95% rename from akka-remote/src/test/scala/remote/RemoteErrorHandlingTest.scala rename to akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala index f3b89172e3..03574a2dc3 100644 --- a/akka-remote/src/test/scala/remote/RemoteErrorHandlingTest.scala +++ b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala @@ -7,7 +7,7 @@ import akka.actor.{ActorRef, Actor} import akka.util.duration._ import java.util.concurrent.atomic.AtomicBoolean -object RemoteErrorHandlingTest { +object RemoteErrorHandlingNetworkTest { case class Send(actor: ActorRef) class RemoteActorSpecActorUnidirectional extends Actor { @@ -50,8 +50,8 @@ object RemoteErrorHandlingTest { } } -class RemoteErrorHandlingTest extends AkkaRemoteTest with NetworkFailureTest { - import RemoteErrorHandlingTest._ +class RemoteErrorHandlingNetworkTest extends AkkaRemoteTest with NetworkFailureTest { + import RemoteErrorHandlingNetworkTest._ "Remote actors" should { diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 4b59c401cb..f33a916da9 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -346,7 +346,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val junit = Dependencies.junit val scalatest = Dependencies.scalatest - override def testOptions = createTestFilter( _.endsWith("Spec")) + lazy val networkTestsEnabled = systemOptional[Boolean]("akka.test.network", false) + + override def testOptions = super.testOptions ++ { + if (!networkTestsEnabled.value) Seq(TestFilter(test => !test.endsWith("NetworkTest"))) + else Seq.empty + } + override def bndImportPackage = "javax.transaction;version=1.1" :: super.bndImportPackage.toList } From 28b6933d99ff0f2b36305e76dd2da551b3266c49 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Mon, 28 Mar 2011 14:15:39 +1300 Subject: [PATCH 11/11] Add sudo directly to network tests --- .../test/scala/remote/AkkaRemoteTest.scala | 25 +++++++++++-------- .../RemoteErrorHandlingNetworkTest.scala | 2 ++ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala index 444c607043..4538489191 100644 --- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala +++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala @@ -124,27 +124,32 @@ trait NetworkFailureTest { self: WordSpec => def enableNetworkThrottling() = { restoreIP() - assert(new ProcessBuilder("ipfw", "add", "pipe", "1", "ip", "from", "any", "to", "any").start.waitFor == 0) - assert(new ProcessBuilder("ipfw", "add", "pipe", "2", "ip", "from", "any", "to", "any").start.waitFor == 0) - assert(new ProcessBuilder("ipfw", "pipe", "1", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0) - assert(new ProcessBuilder("ipfw", "pipe", "2", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "1", "ip", "from", "any", "to", "any").start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "2", "ip", "from", "any", "to", "any").start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "pipe", "1", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "pipe", "2", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0) } def enableNetworkDrop() = { restoreIP() - assert(new ProcessBuilder("ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0) } def enableTcpReset() = { restoreIP() - assert(new ProcessBuilder("ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0) } def restoreIP() = { println("===>>> Restoring network") - assert(new ProcessBuilder("ipfw", "del", "pipe", "1").start.waitFor == 0) - assert(new ProcessBuilder("ipfw", "del", "pipe", "2").start.waitFor == 0) - assert(new ProcessBuilder("ipfw", "flush").start.waitFor == 0) - assert(new ProcessBuilder("ipfw", "pipe", "flush").start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "1").start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "2").start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "flush").start.waitFor == 0) + assert(new ProcessBuilder("sudo", "ipfw", "pipe", "flush").start.waitFor == 0) + } + + def validateSudo() = { + println("===>>> Validating sudo") + assert(new ProcessBuilder("sudo", "-v").start.waitFor == 0) } } diff --git a/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala index 03574a2dc3..dc4559a46b 100644 --- a/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala +++ b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala @@ -56,6 +56,7 @@ class RemoteErrorHandlingNetworkTest extends AkkaRemoteTest with NetworkFailureT "Remote actors" should { "be able to recover from network drop without loosing any messages" in { + validateSudo() val latch = new CountDownLatch(10) implicit val sender = replyHandler(latch, "Pong") val service = actorOf[RemoteActorSpecActorUnidirectional] @@ -79,6 +80,7 @@ class RemoteErrorHandlingNetworkTest extends AkkaRemoteTest with NetworkFailureT } "be able to recover from TCP RESET without loosing any messages" in { + validateSudo() val latch = new CountDownLatch(10) implicit val sender = replyHandler(latch, "Pong") val service = actorOf[RemoteActorSpecActorUnidirectional]