diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index df29edd650..81574dacff 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -6,18 +6,14 @@ package akka.actor import akka.event.EventHandler import akka.dispatch._ -import akka.config.Config._ import akka.config.Supervision._ -import akka.AkkaException import akka.util._ import ReflectiveAccess._ import java.net.InetSocketAddress -import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} -import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.{ Map => JMap } -import java.lang.reflect.Field import scala.reflect.BeanProperty import scala.collection.immutable.Stack @@ -39,7 +35,7 @@ private[akka] object ActorRefInternals { * Abstraction for unification of sender and senderFuture for later reply */ abstract class Channel[T] { - + /** * Sends the specified message to the channel * Scala API @@ -943,7 +939,7 @@ class LocalActorRef private[akka] ( performRestart true } catch { - case e => + case e => EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) false // an error or exception here should trigger a retry } finally { @@ -1009,7 +1005,7 @@ class LocalActorRef private[akka] ( private def handleExceptionInDispatch(reason: Throwable, message: Any) = { EventHandler.error(reason, this, message.toString) - + //Prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -1121,9 +1117,9 @@ private[akka] case class RemoteActorRef private[akka] ( senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = Actor.remote.send[T]( - message, senderOption, senderFuture, - homeAddress.get, timeout, - false, this, None, + message, senderOption, senderFuture, + homeAddress.get, timeout, + false, this, None, actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) @@ -1201,8 +1197,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => */ def id: String - def id_=(id: String): Unit - + def id_=(id: String): Unit + /** * User overridable callback/setting. *
diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 8cc45221cb..6f39aaf2cc 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,6 +16,7 @@ import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.actor.Actor._ +import akka.config.Config._ import akka.util._ import akka.event.EventHandler @@ -149,14 +150,16 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } /** - * This is the abstract baseclass for netty remote clients, - * currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that + * This is the abstract baseclass for netty remote clients, currently there's only an + * ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that * reuses an already established connection. */ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { + val useTransactionLog = config.getBool("akka.remote.resend-on-failure", true) + val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort @@ -237,9 +240,10 @@ abstract class RemoteClient private[akka] ( } } catch { case e: Throwable => - pendingRequests.add((true, null, request)) // add the request to the tx log after a failing send + // add the request to the tx log after a failing send notifyListeners(RemoteClientError(e, module, remoteAddress)) - throw e + if (useTransactionLog) pendingRequests.add((true, null, request)) + else throw e } None } else { @@ -249,10 +253,13 @@ abstract class RemoteClient private[akka] ( futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails def handleRequestReplyError(future: ChannelFuture) = { - pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send - val f = futures.remove(futureUuid) // Clean up future - if (f ne null) f.completeWithException(future.getCause) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + if (useTransactionLog) { + pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send + } else { + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) + } } var future: ChannelFuture = null @@ -509,7 +516,7 @@ class ActiveRemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { try { - client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect + if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) client.resetReconnectionTimeWindow } catch { @@ -625,7 +632,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, bootstrap.releaseExternalResources serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { - case e: Exception => + case e: Exception => EventHandler.error(e, this, e.getMessage) } } @@ -658,7 +665,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader))) } } catch { - case e: Exception => + case e: Exception => EventHandler.error(e, this, e.getMessage) notifyListeners(RemoteServerError(e, this)) } @@ -1049,7 +1056,7 @@ class RemoteServerHandler( write(channel, RemoteEncoder.encode(messageBuilder.build)) } catch { - case e: Exception => + case e: Exception => EventHandler.error(e, this, e.getMessage) server.notifyListeners(RemoteServerError(e, server)) } @@ -1267,4 +1274,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") } } -} \ No newline at end of file +} diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala index 19f3313536..444c607043 100644 --- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala +++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala @@ -65,10 +65,13 @@ trait NetworkFailureTest { self: WordSpec => import akka.actor.Actor._ import akka.util.Duration + // override is subclass if needed val BYTES_PER_SECOND = "60KByte/s" val DELAY_MILLIS = "350ms" val PORT_RANGE = "1024-65535" + // FIXME add support for TCP FIN by hooking into Netty and do socket.close + def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = { spawn { try { @@ -144,4 +147,4 @@ trait NetworkFailureTest { self: WordSpec => assert(new ProcessBuilder("ipfw", "flush").start.waitFor == 0) assert(new ProcessBuilder("ipfw", "pipe", "flush").start.waitFor == 0) } -} \ No newline at end of file +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 342f8e6316..a2d78c4d3e 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -11,7 +11,7 @@ akka { enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] time-unit = "seconds" # Time unit for all timeout properties throughout the config - + event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG @@ -146,6 +146,7 @@ akka { } client { + resend-on-failure = true reconnect-delay = 5 read-timeout = 10 message-frame-size = 1048576