1. Added config option to enable/disable the remote client transaction log for resending failed messages.

2. Swallows exceptions on appending to transaction log and do not complete the Future matching the message.
This commit is contained in:
Jonas Bonér 2011-03-27 22:58:50 +02:00
parent e6c658f58d
commit cf80e6a33d
4 changed files with 34 additions and 27 deletions

View file

@ -6,18 +6,14 @@ package akka.actor
import akka.event.EventHandler import akka.event.EventHandler
import akka.dispatch._ import akka.dispatch._
import akka.config.Config._
import akka.config.Supervision._ import akka.config.Supervision._
import akka.AkkaException
import akka.util._ import akka.util._
import ReflectiveAccess._ import ReflectiveAccess._
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map => JMap } import java.util.{ Map => JMap }
import java.lang.reflect.Field
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import scala.collection.immutable.Stack import scala.collection.immutable.Stack
@ -39,7 +35,7 @@ private[akka] object ActorRefInternals {
* Abstraction for unification of sender and senderFuture for later reply * Abstraction for unification of sender and senderFuture for later reply
*/ */
abstract class Channel[T] { abstract class Channel[T] {
/** /**
* Sends the specified message to the channel * Sends the specified message to the channel
* Scala API * Scala API
@ -943,7 +939,7 @@ class LocalActorRef private[akka] (
performRestart performRestart
true true
} catch { } catch {
case e => case e =>
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry false // an error or exception here should trigger a retry
} finally { } finally {
@ -1009,7 +1005,7 @@ class LocalActorRef private[akka] (
private def handleExceptionInDispatch(reason: Throwable, message: Any) = { private def handleExceptionInDispatch(reason: Throwable, message: Any) = {
EventHandler.error(reason, this, message.toString) EventHandler.error(reason, this, message.toString)
//Prevent any further messages to be processed until the actor has been restarted //Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this) dispatcher.suspend(this)
@ -1121,9 +1117,9 @@ private[akka] case class RemoteActorRef private[akka] (
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = Actor.remote.send[T]( val future = Actor.remote.send[T](
message, senderOption, senderFuture, message, senderOption, senderFuture,
homeAddress.get, timeout, homeAddress.get, timeout,
false, this, None, false, this, None,
actorType, loader) actorType, loader)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
@ -1201,8 +1197,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
*/ */
def id: String def id: String
def id_=(id: String): Unit def id_=(id: String): Unit
/** /**
* User overridable callback/setting. * User overridable callback/setting.
* <p/> * <p/>

View file

@ -16,6 +16,7 @@ import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef,
RemoteActorSystemMessage, uuidFrom, Uuid, RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType} Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.actor.Actor._ import akka.actor.Actor._
import akka.config.Config._
import akka.util._ import akka.util._
import akka.event.EventHandler import akka.event.EventHandler
@ -149,14 +150,16 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
} }
/** /**
* This is the abstract baseclass for netty remote clients, * This is the abstract baseclass for netty remote clients, currently there's only an
* currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that * ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
* reuses an already established connection. * reuses an already established connection.
*/ */
abstract class RemoteClient private[akka] ( abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule, val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) { val remoteAddress: InetSocketAddress) {
val useTransactionLog = config.getBool("akka.remote.resend-on-failure", true)
val name = this.getClass.getSimpleName + "@" + val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort remoteAddress.getPort
@ -237,9 +240,10 @@ abstract class RemoteClient private[akka] (
} }
} catch { } catch {
case e: Throwable => case e: Throwable =>
pendingRequests.add((true, null, request)) // add the request to the tx log after a failing send // add the request to the tx log after a failing send
notifyListeners(RemoteClientError(e, module, remoteAddress)) notifyListeners(RemoteClientError(e, module, remoteAddress))
throw e if (useTransactionLog) pendingRequests.add((true, null, request))
else throw e
} }
None None
} else { } else {
@ -249,10 +253,13 @@ abstract class RemoteClient private[akka] (
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
def handleRequestReplyError(future: ChannelFuture) = { def handleRequestReplyError(future: ChannelFuture) = {
pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
if (useTransactionLog) {
pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
} else {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
}
} }
var future: ChannelFuture = null var future: ChannelFuture = null
@ -509,7 +516,7 @@ class ActiveRemoteClientHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
try { try {
client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow client.resetReconnectionTimeWindow
} catch { } catch {
@ -625,7 +632,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.releaseExternalResources bootstrap.releaseExternalResources
serverModule.notifyListeners(RemoteServerShutdown(serverModule)) serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch { } catch {
case e: Exception => case e: Exception =>
EventHandler.error(e, this, e.getMessage) EventHandler.error(e, this, e.getMessage)
} }
} }
@ -658,7 +665,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader))) currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
} }
} catch { } catch {
case e: Exception => case e: Exception =>
EventHandler.error(e, this, e.getMessage) EventHandler.error(e, this, e.getMessage)
notifyListeners(RemoteServerError(e, this)) notifyListeners(RemoteServerError(e, this))
} }
@ -1049,7 +1056,7 @@ class RemoteServerHandler(
write(channel, RemoteEncoder.encode(messageBuilder.build)) write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch { } catch {
case e: Exception => case e: Exception =>
EventHandler.error(e, this, e.getMessage) EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
} }
@ -1267,4 +1274,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
} }
} }
} }

View file

@ -65,10 +65,13 @@ trait NetworkFailureTest { self: WordSpec =>
import akka.actor.Actor._ import akka.actor.Actor._
import akka.util.Duration import akka.util.Duration
// override is subclass if needed
val BYTES_PER_SECOND = "60KByte/s" val BYTES_PER_SECOND = "60KByte/s"
val DELAY_MILLIS = "350ms" val DELAY_MILLIS = "350ms"
val PORT_RANGE = "1024-65535" val PORT_RANGE = "1024-65535"
// FIXME add support for TCP FIN by hooking into Netty and do socket.close
def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = { def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = {
spawn { spawn {
try { try {
@ -144,4 +147,4 @@ trait NetworkFailureTest { self: WordSpec =>
assert(new ProcessBuilder("ipfw", "flush").start.waitFor == 0) assert(new ProcessBuilder("ipfw", "flush").start.waitFor == 0)
assert(new ProcessBuilder("ipfw", "pipe", "flush").start.waitFor == 0) assert(new ProcessBuilder("ipfw", "pipe", "flush").start.waitFor == 0)
} }
} }

View file

@ -11,7 +11,7 @@ akka {
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
time-unit = "seconds" # Time unit for all timeout properties throughout the config time-unit = "seconds" # Time unit for all timeout properties throughout the config
event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
@ -146,6 +146,7 @@ akka {
} }
client { client {
resend-on-failure = true
reconnect-delay = 5 reconnect-delay = 5
read-timeout = 10 read-timeout = 10
message-frame-size = 1048576 message-frame-size = 1048576