diff --git a/akka-actor/src/main/scala/akka/Implicits.scala b/akka-actor/src/main/scala/akka/Implicits.scala
index 6370e1c2fd..0a781649eb 100644
--- a/akka-actor/src/main/scala/akka/Implicits.scala
+++ b/akka-actor/src/main/scala/akka/Implicits.scala
@@ -14,7 +14,10 @@ package object actor {
ref.asInstanceOf[ActorRef]
type Uuid = com.eaio.uuid.UUID
+
def newUuid(): Uuid = new Uuid()
- def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode)
+
+ def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode)
+
def uuidFrom(uuid: String): Uuid = new Uuid(uuid)
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index df29edd650..81574dacff 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -6,18 +6,14 @@ package akka.actor
import akka.event.EventHandler
import akka.dispatch._
-import akka.config.Config._
import akka.config.Supervision._
-import akka.AkkaException
import akka.util._
import ReflectiveAccess._
import java.net.InetSocketAddress
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map => JMap }
-import java.lang.reflect.Field
import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
@@ -39,7 +35,7 @@ private[akka] object ActorRefInternals {
* Abstraction for unification of sender and senderFuture for later reply
*/
abstract class Channel[T] {
-
+
/**
* Sends the specified message to the channel
* Scala API
@@ -943,7 +939,7 @@ class LocalActorRef private[akka] (
performRestart
true
} catch {
- case e =>
+ case e =>
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry
} finally {
@@ -1009,7 +1005,7 @@ class LocalActorRef private[akka] (
private def handleExceptionInDispatch(reason: Throwable, message: Any) = {
EventHandler.error(reason, this, message.toString)
-
+
//Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
@@ -1121,9 +1117,9 @@ private[akka] case class RemoteActorRef private[akka] (
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = Actor.remote.send[T](
- message, senderOption, senderFuture,
- homeAddress.get, timeout,
- false, this, None,
+ message, senderOption, senderFuture,
+ homeAddress.get, timeout,
+ false, this, None,
actorType, loader)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
@@ -1201,8 +1197,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
*/
def id: String
- def id_=(id: String): Unit
-
+ def id_=(id: String): Unit
+
/**
* User overridable callback/setting.
*
diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala
new file mode 100644
index 0000000000..c3ad4d9c79
--- /dev/null
+++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala
@@ -0,0 +1,44 @@
+package akka.remoteinterface
+
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+import akka.actor.Actor
+import akka.event.EventHandler
+
+/**
+ * Remote client and server event listener that pipes the events to the standard Akka EventHander.
+ *
+ * @author Jonas Bonér
+ */
+class RemoteEventHandler extends Actor {
+ import EventHandler._
+
+ self.id = ID
+ self.dispatcher = EventHandlerDispatcher
+
+ def receive = {
+
+ // client
+ case RemoteClientError(cause, client, address) => EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString)
+ case RemoteClientWriteFailed(request, cause, client, address) => EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(address.toString))
+ case RemoteClientDisconnected(client, address) => EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString)
+ case RemoteClientConnected(client, address) => EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString)
+ case RemoteClientStarted(client, address) => EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString)
+ case RemoteClientShutdown(client, address) => EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString)
+
+ // server
+ case RemoteServerError(cause, server) => EventHandler.error(cause, server, "RemoteServerError")
+ case RemoteServerWriteFailed(request, cause, server, clientAddress) => EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString))
+ case RemoteServerStarted(server) => EventHandler.info(server, "RemoteServerStarted")
+ case RemoteServerShutdown(server) => EventHandler.info(server, "RemoteServerShutdown")
+ case RemoteServerClientConnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString)
+ case RemoteServerClientDisconnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString)
+ case RemoteServerClientClosed(server, clientAddress) => EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString)
+
+ case _ => //ignore other
+ }
+}
+
+
diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
index 62dcc422ee..185f0d2799 100644
--- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
+++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
@@ -5,14 +5,16 @@
package akka.remoteinterface
import akka.japi.Creator
-import java.net.InetSocketAddress
import akka.actor._
import akka.util._
import akka.dispatch.CompletableFuture
-import akka.config.Config.{config, TIME_UNIT}
-import java.util.concurrent.ConcurrentHashMap
import akka.AkkaException
-import reflect.BeanProperty
+
+import scala.reflect.BeanProperty
+
+import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentHashMap
+import java.io.{PrintWriter, PrintStream}
trait RemoteModule {
val UUID_PREFIX = "uuid:"
@@ -20,7 +22,6 @@ trait RemoteModule {
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: => Any): Unit
-
private[akka] def actors: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef]
@@ -28,7 +29,6 @@ trait RemoteModule {
private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef]
private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef]
-
/** Lookup methods **/
private[akka] def findActorById(id: String) : ActorRef = actors.get(id)
@@ -114,25 +114,38 @@ case class RemoteServerWriteFailed(
/**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
*/
-class RemoteClientException private[akka] (message: String,
- @BeanProperty val client: RemoteClientModule,
- val remoteAddress: InetSocketAddress) extends AkkaException(message)
+class RemoteClientException private[akka] (
+ message: String,
+ @BeanProperty val client: RemoteClientModule,
+ val remoteAddress: InetSocketAddress) extends AkkaException(message)
/**
- * Returned when a remote exception cannot be instantiated or parsed
+ * Returned when a remote exception sent over the wire cannot be loaded and instantiated
*/
-case class UnparsableException private[akka] (originalClassName: String,
- originalMessage: String) extends AkkaException(originalMessage)
-
+case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String)
+ extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]"
+ .format(cause.toString, originalClassName, originalMessage)) {
+ override def printStackTrace = cause.printStackTrace
+ override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream)
+ override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter)
+}
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
+
+ lazy val eventHandler: ActorRef = {
+ val handler = Actor.actorOf[RemoteEventHandler].start
+ // add the remote client and server listener that pipes the events to the event handler system
+ addListener(handler)
+ handler
+ }
+
def shutdown {
+ removeListener(eventHandler)
this.shutdownClientModule
this.shutdownServerModule
clear
}
-
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index 120b7e1dfc..210b818784 100644
--- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -5,46 +5,40 @@
package akka.remote.netty
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
+import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
-import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
-import akka.japi.Creator
-import akka.config.Config._
import akka.remoteinterface._
-import akka.actor.{PoisonPill, Index,
- ActorInitializationException, LocalActorRef, newUuid,
- ActorRegistry, Actor, RemoteActorRef,
+import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef,
TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType}
-import akka.AkkaException
-import akka.event.EventHandler
import akka.actor.Actor._
+import akka.config.Config._
import akka.util._
-import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
+import akka.event.EventHandler
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap}
+import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap}
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
-import org.jboss.netty.handler.ssl.SslHandler
-import scala.collection.mutable.{ HashMap }
-import scala.reflect.BeanProperty
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
-import java.net.{ SocketAddress, InetSocketAddress }
+import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
-import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean}
+import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
+import java.util.concurrent._
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@@ -157,29 +151,50 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
}
/**
- * This is the abstract baseclass for netty remote clients,
- * currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
+ * This is the abstract baseclass for netty remote clients, currently there's only an
+ * ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
* reuses an already established connection.
*/
abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
- val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort
+ val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true)
- protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
- protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
- private[remote] val runSwitch = new Switch()
+ val name = this.getClass.getSimpleName + "@" +
+ remoteAddress.getAddress.getHostAddress + "::" +
+ remoteAddress.getPort
+
+ protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
+ protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
+ protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
+
+ private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
protected def notifyListeners(msg: => Any); Unit
+
protected def currentChannel: Channel
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
+
def shutdown: Boolean
+ /**
+ * Returns an array with the current pending messages not yet delivered.
+ */
+ def pendingMessages: Array[Any] = {
+ var messages = Vector[Any]()
+ val iter = pendingRequests.iterator
+ while (iter.hasNext) {
+ val (_, _, message) = iter.next
+ messages = messages :+ MessageSerializer.deserialize(message.getMessage)
+ }
+ messages.toArray
+ }
+
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
@@ -213,37 +228,52 @@ abstract class RemoteClient private[akka] (
* Sends the message across the wire
*/
def send[T](
- request: RemoteMessageProtocol,
- senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
+ request: RemoteMessageProtocol,
+ senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
- val future = currentChannel.write(RemoteEncoder.encode(request))
- future.awaitUninterruptibly()
- if (!future.isCancelled && !future.isSuccess) {
- notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
- throw future.getCause
+ try {
+ val future = currentChannel.write(RemoteEncoder.encode(request))
+ future.awaitUninterruptibly()
+ if (!future.isCancelled && !future.isSuccess) {
+ notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
+ throw future.getCause
+ }
+ } catch {
+ case e: Throwable =>
+ // add the request to the tx log after a failing send
+ notifyListeners(RemoteClientError(e, module, remoteAddress))
+ if (useTransactionLog) pendingRequests.add((true, null, request))
+ else throw e
}
-
None
} else {
- val futureResult = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
- val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
- futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails
- currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
- def operationComplete(future: ChannelFuture) {
- if (future.isCancelled) {
- futures.remove(futureUuid) //Clean this up
- //We don't care about that right now
- } else if (!future.isSuccess) {
- val f = futures.remove(futureUuid) //Clean this up
- if (f ne null)
- f.completeWithException(future.getCause)
- notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
- }
- }
- })
- Some(futureResult)
+ val futureResult = if (senderFuture.isDefined) senderFuture.get
+ else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
+ val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
+ futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
+
+ def handleRequestReplyError(future: ChannelFuture) = {
+ notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
+ if (useTransactionLog) {
+ pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
+ } else {
+ val f = futures.remove(futureUuid) // Clean up future
+ if (f ne null) f.completeWithException(future.getCause)
+ }
+ }
+
+ var future: ChannelFuture = null
+ try {
+ // try to send the original one
+ future = currentChannel.write(RemoteEncoder.encode(request))
+ future.awaitUninterruptibly()
+ if (future.isCancelled) futures.remove(futureUuid) // Clean up future
+ else if (!future.isSuccess) handleRequestReplyError(future)
+ } catch {
+ case e: Exception => handleRequestReplyError(future)
+ }
+ Some(futureResult)
}
} else {
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress)
@@ -252,6 +282,34 @@ abstract class RemoteClient private[akka] (
}
}
+ private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log
+ val nrOfMessages = pendingRequests.size
+ if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
+ var pendingRequest = pendingRequests.peek
+ while (pendingRequest ne null) {
+ val (isOneWay, futureUuid, message) = pendingRequest
+ if (isOneWay) { // sendOneWay
+ val future = currentChannel.write(RemoteEncoder.encode(message))
+ future.awaitUninterruptibly()
+ if (!future.isCancelled && !future.isSuccess) {
+ notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
+ throw future.getCause
+ }
+ } else { // sendRequestReply
+ val future = currentChannel.write(RemoteEncoder.encode(message))
+ future.awaitUninterruptibly()
+ if (future.isCancelled) futures.remove(futureUuid) // Clean up future
+ else if (!future.isSuccess) {
+ val f = futures.remove(futureUuid) // Clean up future
+ if (f ne null) f.completeWithException(future.getCause)
+ notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
+ }
+ }
+ pendingRequests.remove(pendingRequest)
+ pendingRequest = pendingRequests.peek // try to grab next message
+ }
+ }
+
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't register supervisor for " + actorRef + " since it is not under supervision")
@@ -272,6 +330,7 @@ class ActiveRemoteClient private[akka] (
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
+
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile private var bootstrap: ClientBootstrap = _
@volatile private[remote] var connection: ChannelFuture = _
@@ -342,6 +401,7 @@ class ActiveRemoteClient private[akka] (
bootstrap.releaseExternalResources
bootstrap = null
connection = null
+ pendingRequests.clear
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@@ -456,8 +516,16 @@ class ActiveRemoteClientHandler(
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
- client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
- client.resetReconnectionTimeWindow
+ try {
+ if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
+ client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
+ client.resetReconnectionTimeWindow
+ } catch {
+ case e: Throwable =>
+ EventHandler.error(e, this, e.getMessage)
+ client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
+ throw e
+ }
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
@@ -486,7 +554,7 @@ class ActiveRemoteClientHandler(
} catch {
case problem: Throwable =>
EventHandler.error(problem, this, problem.getMessage)
- UnparsableException(classname, exception.getMessage)
+ CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
}
@@ -495,7 +563,8 @@ class ActiveRemoteClientHandler(
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule {
- //Needed for remote testing and switching on/off under run
+
+ // Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
@@ -564,7 +633,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.releaseExternalResources
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
- case e: Exception =>
+ case e: Exception =>
EventHandler.error(e, this, e.getMessage)
}
}
@@ -597,7 +666,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
- case e: Exception =>
+ case e: Exception =>
EventHandler.error(e, this, e.getMessage)
notifyListeners(RemoteServerError(e, this))
}
@@ -988,7 +1057,7 @@ class RemoteServerHandler(
write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch {
- case e: Exception =>
+ case e: Exception =>
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
}
@@ -1206,4 +1275,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
}
}
-}
\ No newline at end of file
+}
diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala
index 0c7421df0a..4538489191 100644
--- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala
+++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala
@@ -1,13 +1,13 @@
package akka.actor.remote
-import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.remote.netty.NettyRemoteSupport
import akka.actor. {Actor, ActorRegistry}
import java.util.concurrent. {TimeUnit, CountDownLatch}
+import org.scalatest.{Spec, WordSpec, BeforeAndAfterAll, BeforeAndAfterEach}
+import java.util.concurrent.atomic.AtomicBoolean
object AkkaRemoteTest {
class ReplyHandlerActor(latch: CountDownLatch, expect: String) extends Actor {
@@ -59,4 +59,97 @@ class AkkaRemoteTest extends
/* Utilities */
def replyHandler(latch: CountDownLatch, expect: String) = Some(Actor.actorOf(new ReplyHandlerActor(latch, expect)).start)
-}
\ No newline at end of file
+}
+
+trait NetworkFailureTest { self: WordSpec =>
+ import akka.actor.Actor._
+ import akka.util.Duration
+
+ // override is subclass if needed
+ val BYTES_PER_SECOND = "60KByte/s"
+ val DELAY_MILLIS = "350ms"
+ val PORT_RANGE = "1024-65535"
+
+ // FIXME add support for TCP FIN by hooking into Netty and do socket.close
+
+ def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = {
+ spawn {
+ try {
+ enableTcpReset()
+ println("===>>> Reply with [TCP RST] for [" + duration + "]")
+ Thread.sleep(duration.toMillis)
+ restoreIP
+ } catch {
+ case e =>
+ dead.set(true)
+ e.printStackTrace
+ }
+ }
+ }
+
+ def throttleNetworkFor(duration: Duration, dead: AtomicBoolean) = {
+ spawn {
+ try {
+ enableNetworkThrottling()
+ println("===>>> Throttling network with [" + BYTES_PER_SECOND + ", " + DELAY_MILLIS + "] for [" + duration + "]")
+ Thread.sleep(duration.toMillis)
+ restoreIP
+ } catch {
+ case e =>
+ dead.set(true)
+ e.printStackTrace
+ }
+ }
+ }
+
+ def dropNetworkFor(duration: Duration, dead: AtomicBoolean) = {
+ spawn {
+ try {
+ enableNetworkDrop()
+ println("===>>> Blocking network [TCP DENY] for [" + duration + "]")
+ Thread.sleep(duration.toMillis)
+ restoreIP
+ } catch {
+ case e =>
+ dead.set(true)
+ e.printStackTrace
+ }
+ }
+ }
+
+ def sleepFor(duration: Duration) = {
+ println("===>>> Sleeping for [" + duration + "]")
+ Thread sleep (duration.toMillis)
+ }
+
+ def enableNetworkThrottling() = {
+ restoreIP()
+ assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "1", "ip", "from", "any", "to", "any").start.waitFor == 0)
+ assert(new ProcessBuilder("sudo", "ipfw", "add", "pipe", "2", "ip", "from", "any", "to", "any").start.waitFor == 0)
+ assert(new ProcessBuilder("sudo", "ipfw", "pipe", "1", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0)
+ assert(new ProcessBuilder("sudo", "ipfw", "pipe", "2", "config", "bw", BYTES_PER_SECOND, "delay", DELAY_MILLIS).start.waitFor == 0)
+ }
+
+ def enableNetworkDrop() = {
+ restoreIP()
+ assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0)
+ }
+
+ def enableTcpReset() = {
+ restoreIP()
+ assert(new ProcessBuilder("sudo", "ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PORT_RANGE).start.waitFor == 0)
+ }
+
+ def restoreIP() = {
+ println("===>>> Restoring network")
+ assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "1").start.waitFor == 0)
+ assert(new ProcessBuilder("sudo", "ipfw", "del", "pipe", "2").start.waitFor == 0)
+ assert(new ProcessBuilder("sudo", "ipfw", "flush").start.waitFor == 0)
+ assert(new ProcessBuilder("sudo", "ipfw", "pipe", "flush").start.waitFor == 0)
+ }
+
+ def validateSudo() = {
+ println("===>>> Validating sudo")
+ assert(new ProcessBuilder("sudo", "-v").start.waitFor == 0)
+ }
+}
diff --git a/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala
new file mode 100644
index 0000000000..dc4559a46b
--- /dev/null
+++ b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala
@@ -0,0 +1,119 @@
+package akka.actor.remote
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import akka.actor.Actor._
+import akka.actor.{ActorRef, Actor}
+import akka.util.duration._
+import java.util.concurrent.atomic.AtomicBoolean
+
+object RemoteErrorHandlingNetworkTest {
+ case class Send(actor: ActorRef)
+
+ class RemoteActorSpecActorUnidirectional extends Actor {
+ self.id = "network-drop:unidirectional"
+ def receive = {
+ case "Ping" => self.reply_?("Pong")
+ }
+ }
+
+ class Decrementer extends Actor {
+ def receive = {
+ case "done" => self.reply_?(false)
+ case i: Int if i > 0 =>
+ self.reply_?(i - 1)
+ case i: Int =>
+ self.reply_?(0)
+ this become {
+ case "done" => self.reply_?(true)
+ case _ => //Do Nothing
+ }
+ }
+ }
+
+ class RemoteActorSpecActorBidirectional extends Actor {
+
+ def receive = {
+ case "Hello" =>
+ self.reply("World")
+ case "Failure" =>
+ throw new RuntimeException("Expected exception; to test fault-tolerance")
+ }
+ }
+
+ class RemoteActorSpecActorAsyncSender(latch: CountDownLatch) extends Actor {
+ def receive = {
+ case Send(actor: ActorRef) =>
+ actor ! "Hello"
+ case "World" => latch.countDown
+ }
+ }
+}
+
+class RemoteErrorHandlingNetworkTest extends AkkaRemoteTest with NetworkFailureTest {
+ import RemoteErrorHandlingNetworkTest._
+
+ "Remote actors" should {
+
+ "be able to recover from network drop without loosing any messages" in {
+ validateSudo()
+ val latch = new CountDownLatch(10)
+ implicit val sender = replyHandler(latch, "Pong")
+ val service = actorOf[RemoteActorSpecActorUnidirectional]
+ remote.register(service.id, service)
+ val actor = remote.actorFor(service.id, 5000L, host, port)
+ actor ! "Ping"
+ actor ! "Ping"
+ actor ! "Ping"
+ actor ! "Ping"
+ actor ! "Ping"
+ val dead = new AtomicBoolean(false)
+ dropNetworkFor (10 seconds, dead) // drops the network - in another thread - so async
+ sleepFor (2 seconds) // wait until network drop is done before sending the other messages
+ try { actor ! "Ping" } catch { case e => () } // queue up messages
+ try { actor ! "Ping" } catch { case e => () } // ...
+ try { actor ! "Ping" } catch { case e => () } // ...
+ try { actor ! "Ping" } catch { case e => () } // ...
+ try { actor ! "Ping" } catch { case e => () } // ...
+ latch.await(15, TimeUnit.SECONDS) must be (true) // network should be restored and the messages delivered
+ dead.get must be (false)
+ }
+
+ "be able to recover from TCP RESET without loosing any messages" in {
+ validateSudo()
+ val latch = new CountDownLatch(10)
+ implicit val sender = replyHandler(latch, "Pong")
+ val service = actorOf[RemoteActorSpecActorUnidirectional]
+ remote.register(service.id, service)
+ val actor = remote.actorFor(service.id, 5000L, host, port)
+ actor ! "Ping"
+ actor ! "Ping"
+ actor ! "Ping"
+ actor ! "Ping"
+ actor ! "Ping"
+ val dead = new AtomicBoolean(false)
+ replyWithTcpResetFor (10 seconds, dead)
+ sleepFor (2 seconds)
+ try { actor ! "Ping" } catch { case e => () } // queue up messages
+ try { actor ! "Ping" } catch { case e => () } // ...
+ try { actor ! "Ping" } catch { case e => () } // ...
+ try { actor ! "Ping" } catch { case e => () } // ...
+ try { actor ! "Ping" } catch { case e => () } // ...
+ latch.await(15, TimeUnit.SECONDS) must be (true)
+ dead.get must be (false)
+ }
+/*
+ "sendWithBangAndGetReplyThroughSenderRef" in {
+ remote.register(actorOf[RemoteActorSpecActorBidirectional])
+ implicit val timeout = 500000000L
+ val actor = remote.actorFor(
+ "akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", timeout, host, port)
+ val latch = new CountDownLatch(1)
+ val sender = actorOf( new RemoteActorSpecActorAsyncSender(latch) ).start
+ sender ! Send(actor)
+ latch.await(1, TimeUnit.SECONDS) must be (true)
+ }
+ */
+ }
+}
+
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 342f8e6316..164fae1edc 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -11,7 +11,7 @@ akka {
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
time-unit = "seconds" # Time unit for all timeout properties throughout the config
-
+
event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
@@ -146,6 +146,7 @@ akka {
}
client {
+ retry-message-send-on-failure = on
reconnect-delay = 5
read-timeout = 10
message-frame-size = 1048576
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 589c62af04..82af5a909e 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -344,6 +344,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val junit = Dependencies.junit
val scalatest = Dependencies.scalatest
+ lazy val networkTestsEnabled = systemOptional[Boolean]("akka.test.network", false)
+
+ override def testOptions = super.testOptions ++ {
+ if (!networkTestsEnabled.value) Seq(TestFilter(test => !test.endsWith("NetworkTest")))
+ else Seq.empty
+ }
+
override def bndImportPackage = "javax.transaction;version=1.1" :: super.bndImportPackage.toList
}
diff --git a/scripts/ip-mod.sh b/scripts/ip-mod.sh
new file mode 100755
index 0000000000..e9b509ae59
--- /dev/null
+++ b/scripts/ip-mod.sh
@@ -0,0 +1,31 @@
+#!/bin/sh
+
+# flush rules
+ipfw del pipe 1
+ipfw del pipe 2
+ipfw -q -f flush
+ipfw -q -f pipe flush
+
+if [ "$1" == "" ]; then
+ echo "Options: ip-mod.sh slow"
+ echo " ip-mod.sh block"
+ echo " ip-mod.sh reset"
+ echo " ip-mod.sh restore"
+ exit
+elif [ "$1" == "restore" ]; then
+ echo "restoring normal network"
+ exit
+elif [ "$1" == "slow" ]; then
+ # simulate slow connection
+ echo "enabling slow connection"
+ ipfw add pipe 1 ip from any to any
+ ipfw add pipe 2 ip from any to any
+ ipfw pipe 1 config bw 60KByte/s delay 350ms
+ ipfw pipe 2 config bw 60KByte/s delay 350ms
+elif [ "$1" == "block" ]; then
+ echo "enabling blocked connections"
+ ipfw add 1 deny tcp from any to any 1024-65535
+elif [ "$1" == "reset" ]; then
+ echo "enabling reset connections"
+ ipfw add 1 reset tcp from any to any 1024-65535
+fi