From 90f7e0ea4e8d9383abfbe99e3471c91c7ab8ed96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 29 Dec 2009 14:24:48 +0100 Subject: [PATCH] Fixed bug in RemoteClient reconnect, now works flawlessly + Added option to declaratively configure an Actor to be remote --- .../src/main/scala/actor/Supervisor.scala | 2 +- akka-core/src/main/scala/config/Config.scala | 10 +++++++++- .../src/main/scala/remote/RemoteClient.scala | 19 +++++++++++-------- .../src/main/scala/ChatServer.scala | 3 ++- .../src/main/scala/akka/SimpleService.scala | 3 +-- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 23c1c55998..5aa3ec2183 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -112,7 +112,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep case SupervisorConfig(_, servers) => servers.map(server => server match { - case Supervise(actor, lifeCycle) => + case Supervise(actor, lifeCycle, remoteAddress) => actors.put(actor.getClass.getName, actor) actor.lifeCycle = Some(lifeCycle) startLink(actor) diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala index 8a4239ef9c..828b7cd516 100644 --- a/akka-core/src/main/scala/config/Config.scala +++ b/akka-core/src/main/scala/config/Config.scala @@ -24,7 +24,15 @@ object ScalaConfig { abstract class Scope extends ConfigElement case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server - case class Supervise(actor: Actor, lifeCycle: LifeCycle) extends Server + + class Supervise(val actor: Actor, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) + } + object Supervise { + def apply(actor: Actor, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actor, lifeCycle, remoteAddress) + def apply(actor: Actor, lifeCycle: LifeCycle) = new Supervise(actor, lifeCycle, null) + def unapply(supervise: Supervise) = Some((supervise.actor, supervise.lifeCycle, supervise.remoteAddress)) + } case class RestartStrategy( scheme: FailOverScheme, diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index c849ed94ec..995f53ef32 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -21,7 +21,7 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import org.jboss.netty.handler.timeout.ReadTimeoutHandler import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer} -import java.net.InetSocketAddress +import java.net.{SocketAddress, InetSocketAddress} import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicLong @@ -84,8 +84,9 @@ class RemoteClient(hostname: String, port: Int) extends Logging { private val bootstrap = new ClientBootstrap(channelFactory) private val timer = new HashedWheelTimer + private val remoteAddress = new InetSocketAddress(hostname, port) - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, timer)) + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -93,7 +94,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { def connect = synchronized { if (!isRunning) { - connection = bootstrap.connect(new InetSocketAddress(hostname, port)) + connection = bootstrap.connect(remoteAddress) log.info("Starting remote client connection to [%s:%s]", hostname, port) // Wait until the connection attempt succeeds or fails. @@ -146,7 +147,8 @@ class RemoteClientPipelineFactory(name: String, futures: ConcurrentMap[Long, CompletableFutureResult], supervisors: ConcurrentMap[String, Actor], bootstrap: ClientBootstrap, - timer: HashedWheelTimer) extends ChannelPipelineFactory { + remoteAddress: SocketAddress, + timer: HashedWheelTimer) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val pipeline = Channels.pipeline() pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)) @@ -164,7 +166,7 @@ class RemoteClientPipelineFactory(name: String, } pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) pipeline.addLast("protobufEncoder", new ProtobufEncoder()) - pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, timer)) + pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer)) pipeline } } @@ -177,7 +179,8 @@ class RemoteClientHandler(val name: String, val futures: ConcurrentMap[Long, CompletableFutureResult], val supervisors: ConcurrentMap[String, Actor], val bootstrap: ClientBootstrap, - val timer: HashedWheelTimer) + val remoteAddress: SocketAddress, + val timer: HashedWheelTimer) extends SimpleChannelUpstreamHandler with Logging { import Actor.Sender.Self @@ -221,8 +224,8 @@ class RemoteClientHandler(val name: String, override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = { - log.debug("Remote client reconnecting to [%s]", ctx.getChannel.getRemoteAddress) - bootstrap.connect + log.debug("Remote client reconnecting to [%s]", remoteAddress) + bootstrap.connect(remoteAddress) } }, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS) } diff --git a/akka-samples-chat/src/main/scala/ChatServer.scala b/akka-samples-chat/src/main/scala/ChatServer.scala index 76dc175f82..9a7a669f00 100644 --- a/akka-samples-chat/src/main/scala/ChatServer.scala +++ b/akka-samples-chat/src/main/scala/ChatServer.scala @@ -170,7 +170,8 @@ class Boot { RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), Supervise( ChatService, - LifeCycle(Permanent)) + LifeCycle(Permanent), + RemoteAddress("localhost", 9999)) :: Nil)) factory.newInstance.start } diff --git a/akka-samples-lift/src/main/scala/akka/SimpleService.scala b/akka-samples-lift/src/main/scala/akka/SimpleService.scala index 4f23ef965a..471c8f12b3 100644 --- a/akka-samples-lift/src/main/scala/akka/SimpleService.scala +++ b/akka-samples-lift/src/main/scala/akka/SimpleService.scala @@ -47,8 +47,7 @@ class SimpleService extends Transactor { * Or browse to the URL from a web browser. */ @Path("/persistentliftcount") -class PersistentSimpleService extends Actor { - makeTransactionRequired +class PersistentSimpleService extends Transactor { case object Tick private val KEY = "COUNTER"