Renamed config option for remote client retry message send.

This commit is contained in:
Jonas Bonér 2011-03-28 08:42:46 +02:00
commit 0412ae4151
10 changed files with 464 additions and 88 deletions

View file

@ -5,46 +5,40 @@
package akka.remote.netty
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.japi.Creator
import akka.config.Config._
import akka.remoteinterface._
import akka.actor.{PoisonPill, Index,
ActorInitializationException, LocalActorRef, newUuid,
ActorRegistry, Actor, RemoteActorRef,
import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef,
TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException
import akka.event.EventHandler
import akka.actor.Actor._
import akka.config.Config._
import akka.util._
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.event.EventHandler
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap}
import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap}
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.{ HashMap }
import scala.reflect.BeanProperty
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.{ SocketAddress, InetSocketAddress }
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean}
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.concurrent._
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -157,29 +151,50 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
}
/**
* This is the abstract baseclass for netty remote clients,
* currently there's only an ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
* This is the abstract baseclass for netty remote clients, currently there's only an
* ActiveRemoteClient, but otehrs could be feasible, like a PassiveRemoteClient that
* reuses an already established connection.
*/
abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort
val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true)
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
private[remote] val runSwitch = new Switch()
val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
protected def notifyListeners(msg: => Any); Unit
protected def currentChannel: Channel
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
def shutdown: Boolean
/**
* Returns an array with the current pending messages not yet delivered.
*/
def pendingMessages: Array[Any] = {
var messages = Vector[Any]()
val iter = pendingRequests.iterator
while (iter.hasNext) {
val (_, _, message) = iter.next
messages = messages :+ MessageSerializer.deserialize(message.getMessage)
}
messages.toArray
}
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
@ -213,37 +228,52 @@ abstract class RemoteClient private[akka] (
* Sends the message across the wire
*/
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
val future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
try {
val future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
}
} catch {
case e: Throwable =>
// add the request to the tx log after a failing send
notifyListeners(RemoteClientError(e, module, remoteAddress))
if (useTransactionLog) pendingRequests.add((true, null, request))
else throw e
}
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
futures.remove(futureUuid) //Clean this up
//We don't care about that right now
} else if (!future.isSuccess) {
val f = futures.remove(futureUuid) //Clean this up
if (f ne null)
f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
}
}
})
Some(futureResult)
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
def handleRequestReplyError(future: ChannelFuture) = {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
if (useTransactionLog) {
pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
} else {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
}
}
var future: ChannelFuture = null
try {
// try to send the original one
future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
else if (!future.isSuccess) handleRequestReplyError(future)
} catch {
case e: Exception => handleRequestReplyError(future)
}
Some(futureResult)
}
} else {
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress)
@ -252,6 +282,34 @@ abstract class RemoteClient private[akka] (
}
}
private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log
val nrOfMessages = pendingRequests.size
if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
var pendingRequest = pendingRequests.peek
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) { // sendOneWay
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
throw future.getCause
}
} else { // sendRequestReply
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
else if (!future.isSuccess) {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
}
}
pendingRequests.remove(pendingRequest)
pendingRequest = pendingRequests.peek // try to grab next message
}
}
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't register supervisor for " + actorRef + " since it is not under supervision")
@ -272,6 +330,7 @@ class ActiveRemoteClient private[akka] (
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile private var bootstrap: ClientBootstrap = _
@volatile private[remote] var connection: ChannelFuture = _
@ -342,6 +401,7 @@ class ActiveRemoteClient private[akka] (
bootstrap.releaseExternalResources
bootstrap = null
connection = null
pendingRequests.clear
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@ -456,8 +516,16 @@ class ActiveRemoteClientHandler(
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
try {
if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
} catch {
case e: Throwable =>
EventHandler.error(e, this, e.getMessage)
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
throw e
}
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
@ -486,7 +554,7 @@ class ActiveRemoteClientHandler(
} catch {
case problem: Throwable =>
EventHandler.error(problem, this, problem.getMessage)
UnparsableException(classname, exception.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
}
@ -495,7 +563,8 @@ class ActiveRemoteClientHandler(
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule {
//Needed for remote testing and switching on/off under run
// Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
@ -564,7 +633,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.releaseExternalResources
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
}
}
@ -597,7 +666,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
notifyListeners(RemoteServerError(e, this))
}
@ -988,7 +1057,7 @@ class RemoteServerHandler(
write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch {
case e: Exception =>
case e: Exception =>
EventHandler.error(e, this, e.getMessage)
server.notifyListeners(RemoteServerError(e, server))
}
@ -1206,4 +1275,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
}
}
}
}