From 2d0ca684eba4737f1a6bf35fc150a5d0526603d0 Mon Sep 17 00:00:00 2001 From: jboner Date: Thu, 22 Oct 2009 11:14:36 +0200 Subject: [PATCH] Added reconnection handler and config to RemoteClient --- .../src/main/scala/nio/RemoteClient.scala | 70 +++-- .../src/main/scala/nio/RemoteServer.scala | 6 +- .../main/scala/stm/TransactionalState.scala | 3 +- .../src/test/scala/InMemoryActorSpec.scala | 2 +- akka.iws | 243 ++++++++++++------ config/akka-reference.conf | 14 +- 6 files changed, 223 insertions(+), 115 deletions(-) diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala index 3df4d24fe8..6ac441dfa7 100644 --- a/akka-actors/src/main/scala/nio/RemoteClient.scala +++ b/akka-actors/src/main/scala/nio/RemoteClient.scala @@ -4,32 +4,40 @@ package se.scalablesolutions.akka.nio -import java.net.InetSocketAddress -import java.util.concurrent.{Executors, ConcurrentMap, ConcurrentHashMap} +import scala.collection.mutable.HashMap import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} -import actor.{Exit, Actor} -import dispatch.{DefaultCompletableFutureResult, CompletableFutureResult} -import serialization.{Serializer, Serializable, SerializationProtocol} -import util.Logging +import se.scalablesolutions.akka.actor.{Exit, Actor} +import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult} +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.Config.config import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel._ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} +import org.jboss.netty.handler.timeout.ReadTimeoutHandler +import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer} -import scala.collection.mutable.HashMap +import java.net.InetSocketAddress +import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} /** * @author Jonas Bonér */ object RemoteClient extends Logging { + val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000) + val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000) + + // TODO: add configuration optons: 'HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel)' + private[akka] val TIMER = new HashedWheelTimer private val clients = new HashMap[String, RemoteClient] + def clientFor(address: InetSocketAddress): RemoteClient = synchronized { val hostname = address.getHostName val port = address.getPort - val hash = hostname + ":" + port + val hash = hostname + ':' + port if (clients.contains(hash)) clients(hash) else { val client = new RemoteClient(hostname, port) @@ -45,7 +53,7 @@ object RemoteClient extends Logging { */ class RemoteClient(hostname: String, port: Int) extends Logging { val name = "RemoteClient@" + hostname - + @volatile private var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] private val supervisors = new ConcurrentHashMap[String, Actor] @@ -57,7 +65,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { private val bootstrap = new ClientBootstrap(channelFactory) - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors)) + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -115,15 +123,17 @@ class RemoteClient(hostname: String, port: Int) extends Logging { */ class RemoteClientPipelineFactory(name: String, futures: ConcurrentMap[Long, CompletableFutureResult], - supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory { + supervisors: ConcurrentMap[String, Actor], + bootstrap: ClientBootstrap) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - val p = Channels.pipeline() - p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); - p.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance)); - p.addLast("frameEncoder", new LengthFieldPrepender(4)); - p.addLast("protobufEncoder", new ProtobufEncoder()); - p.addLast("handler", new RemoteClientHandler(name, futures, supervisors)) - p + val pipeline = Channels.pipeline() + pipeline.addLast("timeout", new ReadTimeoutHandler(RemoteClient.TIMER, RemoteClient.READ_TIMEOUT)) + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) + pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance)) + pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) + pipeline.addLast("protobufEncoder", new ProtobufEncoder()) + pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap)) + pipeline } } @@ -133,7 +143,8 @@ class RemoteClientPipelineFactory(name: String, @ChannelPipelineCoverage { val value = "all" } class RemoteClientHandler(val name: String, val futures: ConcurrentMap[Long, CompletableFutureResult], - val supervisors: ConcurrentMap[String, Actor]) + val supervisors: ConcurrentMap[String, Actor], + val bootstrap: ClientBootstrap) extends SimpleChannelUpstreamHandler with Logging { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -148,13 +159,13 @@ class RemoteClientHandler(val name: String, val result = event.getMessage if (result.isInstanceOf[RemoteReply]) { val reply = result.asInstanceOf[RemoteReply] - log.debug("Received RemoteReply[\n%s]", reply.toString) + log.debug("Remote client received RemoteReply[\n%s]", reply.toString) val future = futures.get(reply.getId) if (reply.getIsSuccessful) { val message = RemoteProtocolBuilder.getMessage(reply) future.completeWithResult(message) } else { - if (reply.hasSupervisorUuid) { + if (reply.hasSupervisorUuid()) { val supervisorUuid = reply.getSupervisorUuid if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") val supervisedActor = supervisors.get(supervisorUuid) @@ -172,7 +183,22 @@ class RemoteClientHandler(val name: String, } } - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) { + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + RemoteClient.TIMER.newTimeout(new TimerTask() { + def run(timeout: Timeout) = { + log.debug("Remote client reconnecting to [%s]", ctx.getChannel.getRemoteAddress) + bootstrap.connect + } + }, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS) + } + + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = + log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) + + override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = + log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress); + + override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { log.error("Unexpected exception from downstream in remote client: %s", event.getCause) event.getCause.printStackTrace event.getChannel.close diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala index aa327dfbc8..c2471054ee 100755 --- a/akka-actors/src/main/scala/nio/RemoteServer.scala +++ b/akka-actors/src/main/scala/nio/RemoteServer.scala @@ -23,9 +23,9 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} * @author Jonas Bonér */ object RemoteServer extends Logging { - val HOSTNAME = config.getString("akka.remote.hostname", "localhost") - val PORT = config.getInt("akka.remote.port", 9999) - val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000) + val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") + val PORT = config.getInt("akka.remote.server.port", 9999) + val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) private var hostname = HOSTNAME private var port = PORT diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala index 5ca5f723b1..34758de6fa 100644 --- a/akka-actors/src/main/scala/stm/TransactionalState.scala +++ b/akka-actors/src/main/scala/stm/TransactionalState.scala @@ -78,11 +78,10 @@ object TransactionalRef { class TransactionalRef[T] extends Transactional { import org.multiverse.utils.TransactionThreadLocal._ - println("---- create TX " + getThreadLocalTransaction) private[this] val ref: Ref[T] = new Ref[T]//Ref.createCommittedRef[T] - def swap(elem: T) = ref.set(elem) + def get: Option[T] = { if (ref.isNull) None else Some(ref.get) diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala index d6186043ac..23d8f2c43b 100644 --- a/akka-actors/src/test/scala/InMemoryActorSpec.scala +++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala @@ -26,7 +26,7 @@ case class FailureOneWay(key: String, value: String, failer: Actor) class InMemStatefulActor extends Actor { timeout = 100000 makeTransactionRequired - //dispatcher = se.scalablesolutions.akka.dispatch.Dispatchers.newThreadBasedDispatcher(this) + private lazy val mapState: TransactionalMap[String, String] = TransactionalState.newMap[String, String] private lazy val vectorState: TransactionalVector[String] = TransactionalState.newVector[String] private lazy val refState: TransactionalRef[String] = TransactionalState.newRef[String] diff --git a/akka.iws b/akka.iws index adb7a8b56a..5ae1ca3ef2 100644 --- a/akka.iws +++ b/akka.iws @@ -5,9 +5,12 @@ - - + + + + + @@ -238,37 +241,91 @@ - - + + - + - - + + - + - - + + - + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -289,10 +346,6 @@ @@ -854,11 +911,11 @@ - - - - - + + + + + localhost @@ -899,14 +956,14 @@ - + - + @@ -958,68 +1015,11 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1048,6 +1048,83 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +