diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala
index 3df4d24fe8..6ac441dfa7 100644
--- a/akka-actors/src/main/scala/nio/RemoteClient.scala
+++ b/akka-actors/src/main/scala/nio/RemoteClient.scala
@@ -4,32 +4,40 @@
package se.scalablesolutions.akka.nio
-import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, ConcurrentMap, ConcurrentHashMap}
+import scala.collection.mutable.HashMap
import protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
-import actor.{Exit, Actor}
-import dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
-import serialization.{Serializer, Serializable, SerializationProtocol}
-import util.Logging
+import se.scalablesolutions.akka.actor.{Exit, Actor}
+import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
+import se.scalablesolutions.akka.util.Logging
+import se.scalablesolutions.akka.Config.config
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler
+import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer}
-import scala.collection.mutable.HashMap
+import java.net.InetSocketAddress
+import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
/**
* @author Jonas Bonér
*/
object RemoteClient extends Logging {
+ val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000)
+ val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
+
+ // TODO: add configuration optons: 'HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel)'
+ private[akka] val TIMER = new HashedWheelTimer
private val clients = new HashMap[String, RemoteClient]
+
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
val hostname = address.getHostName
val port = address.getPort
- val hash = hostname + ":" + port
+ val hash = hostname + ':' + port
if (clients.contains(hash)) clients(hash)
else {
val client = new RemoteClient(hostname, port)
@@ -45,7 +53,7 @@ object RemoteClient extends Logging {
*/
class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname
-
+
@volatile private var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val supervisors = new ConcurrentHashMap[String, Actor]
@@ -57,7 +65,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
private val bootstrap = new ClientBootstrap(channelFactory)
- bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors))
+ bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@@ -115,15 +123,17 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
*/
class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
- supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory {
+ supervisors: ConcurrentMap[String, Actor],
+ bootstrap: ClientBootstrap) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
- val p = Channels.pipeline()
- p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
- p.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance));
- p.addLast("frameEncoder", new LengthFieldPrepender(4));
- p.addLast("protobufEncoder", new ProtobufEncoder());
- p.addLast("handler", new RemoteClientHandler(name, futures, supervisors))
- p
+ val pipeline = Channels.pipeline()
+ pipeline.addLast("timeout", new ReadTimeoutHandler(RemoteClient.TIMER, RemoteClient.READ_TIMEOUT))
+ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
+ pipeline.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance))
+ pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
+ pipeline.addLast("protobufEncoder", new ProtobufEncoder())
+ pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap))
+ pipeline
}
}
@@ -133,7 +143,8 @@ class RemoteClientPipelineFactory(name: String,
@ChannelPipelineCoverage { val value = "all" }
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
- val supervisors: ConcurrentMap[String, Actor])
+ val supervisors: ConcurrentMap[String, Actor],
+ val bootstrap: ClientBootstrap)
extends SimpleChannelUpstreamHandler with Logging {
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
@@ -148,13 +159,13 @@ class RemoteClientHandler(val name: String,
val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
- log.debug("Received RemoteReply[\n%s]", reply.toString)
+ log.debug("Remote client received RemoteReply[\n%s]", reply.toString)
val future = futures.get(reply.getId)
if (reply.getIsSuccessful) {
val message = RemoteProtocolBuilder.getMessage(reply)
future.completeWithResult(message)
} else {
- if (reply.hasSupervisorUuid) {
+ if (reply.hasSupervisorUuid()) {
val supervisorUuid = reply.getSupervisorUuid
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
@@ -172,7 +183,22 @@ class RemoteClientHandler(val name: String,
}
}
- override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) {
+ override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
+ RemoteClient.TIMER.newTimeout(new TimerTask() {
+ def run(timeout: Timeout) = {
+ log.debug("Remote client reconnecting to [%s]", ctx.getChannel.getRemoteAddress)
+ bootstrap.connect
+ }
+ }, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS)
+ }
+
+ override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
+ log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
+
+ override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
+ log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress);
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.error("Unexpected exception from downstream in remote client: %s", event.getCause)
event.getCause.printStackTrace
event.getChannel.close
diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala
index aa327dfbc8..c2471054ee 100755
--- a/akka-actors/src/main/scala/nio/RemoteServer.scala
+++ b/akka-actors/src/main/scala/nio/RemoteServer.scala
@@ -23,9 +23,9 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
* @author Jonas Bonér
*/
object RemoteServer extends Logging {
- val HOSTNAME = config.getString("akka.remote.hostname", "localhost")
- val PORT = config.getInt("akka.remote.port", 9999)
- val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000)
+ val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
+ val PORT = config.getInt("akka.remote.server.port", 9999)
+ val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
private var hostname = HOSTNAME
private var port = PORT
diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala
index 5ca5f723b1..34758de6fa 100644
--- a/akka-actors/src/main/scala/stm/TransactionalState.scala
+++ b/akka-actors/src/main/scala/stm/TransactionalState.scala
@@ -78,11 +78,10 @@ object TransactionalRef {
class TransactionalRef[T] extends Transactional {
import org.multiverse.utils.TransactionThreadLocal._
- println("---- create TX " + getThreadLocalTransaction)
private[this] val ref: Ref[T] = new Ref[T]//Ref.createCommittedRef[T]
-
def swap(elem: T) = ref.set(elem)
+
def get: Option[T] = {
if (ref.isNull) None
else Some(ref.get)
diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala
index d6186043ac..23d8f2c43b 100644
--- a/akka-actors/src/test/scala/InMemoryActorSpec.scala
+++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala
@@ -26,7 +26,7 @@ case class FailureOneWay(key: String, value: String, failer: Actor)
class InMemStatefulActor extends Actor {
timeout = 100000
makeTransactionRequired
- //dispatcher = se.scalablesolutions.akka.dispatch.Dispatchers.newThreadBasedDispatcher(this)
+
private lazy val mapState: TransactionalMap[String, String] = TransactionalState.newMap[String, String]
private lazy val vectorState: TransactionalVector[String] = TransactionalState.newVector[String]
private lazy val refState: TransactionalRef[String] = TransactionalState.newRef[String]
diff --git a/akka.iws b/akka.iws
index adb7a8b56a..5ae1ca3ef2 100644
--- a/akka.iws
+++ b/akka.iws
@@ -5,9 +5,12 @@
-
-
+
+
+
+
+
@@ -238,37 +241,91 @@
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -289,10 +346,6 @@
@@ -854,11 +911,11 @@
-
-
-
-
-
+
+
+
+
+
localhost
@@ -899,14 +956,14 @@
-
+
-
+
@@ -958,68 +1015,11 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1048,6 +1048,83 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index d80fa1cd0a..281853c106 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -37,10 +37,16 @@
- service = on
- hostname = "localhost"
- port = 9999
- connection-timeout = 1000 # in millis
+
+ service = on
+ hostname = "localhost"
+ port = 9999
+ connection-timeout = 1000 # in millis (1 sec default)
+
+
+ reconnect-delay = 5000 # in millis (5 sec default)
+ read-timeout = 10000 # in millis (10 sec default)
+