From ba134148ee057b1b8abedbbf59778d45dc383406 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Sat, 21 Aug 2010 13:07:22 +0200 Subject: [PATCH] Added support for reconnection-time-window for RemoteClient, configurable through akka-reference.conf --- .../src/main/scala/remote/RemoteClient.scala | 58 ++++++++++++++----- config/akka-reference.conf | 1 + 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index aa36ee9038..f7f833292f 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -55,6 +55,9 @@ case class RemoteClientDisconnected( case class RemoteClientConnected( @BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientStopped( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent class RemoteClientException private[akka](message: String) extends AkkaException(message) @@ -151,7 +154,6 @@ object RemoteClient extends Logging { actorsFor(RemoteServer.Address(hostname, port)) += uuid } - // TODO: add RemoteClient.unregister for TypedActor, but first need a @shutdown callback private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized { val set = actorsFor(RemoteServer.Address(hostname, port)) set -= uuid @@ -174,7 +176,8 @@ object RemoteClient extends Logging { * * @author Jonas Bonér */ -class RemoteClient private[akka] (val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) extends Logging with ListenerManagement { +class RemoteClient private[akka] (val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) + extends Logging with ListenerManagement { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false @@ -192,6 +195,10 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade private[remote] var connection: ChannelFuture = _ private[remote] val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName); + private val reconnectionTimeWindow = + Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis + @volatile private var reconnectionTimeWindowStart = 0L + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -212,12 +219,14 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade } def shutdown = synchronized { + log.info("Shutting down %s", name) if (isRunning) { isRunning = false + timer.stop openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources - timer.stop log.info("%s has been shut down", name) + foreachListener(l => l ! RemoteClientStopped(hostname, port)) } } @@ -259,6 +268,21 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( "Can't unregister supervisor for " + actorRef + " since it is not under supervision") else supervisors.remove(actorRef.supervisor.get.uuid) + + private[akka] def isWithinReconnectionTimeWindow: Boolean = { + if (reconnectionTimeWindowStart == 0L) { + reconnectionTimeWindowStart = System.currentTimeMillis + true + } else { + val timeLeft = reconnectionTimeWindow - (System.currentTimeMillis - reconnectionTimeWindowStart) + if (timeLeft > 0) { + log.info("Will try to reconnect to remote server for another [%s] milliseconds", timeLeft) + true + } else false + } + } + + private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L } /** @@ -359,25 +383,29 @@ class RemoteClientHandler( } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = if (client.isRunning) { - timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = { - client.openChannels.remove(event.getChannel) - log.debug("Remote client reconnecting to [%s]", remoteAddress) - client.connection = bootstrap.connect(remoteAddress) - client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. - if (!client.connection.isSuccess) { - client.foreachListener(l => - l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port)) - log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) + if (client.isWithinReconnectionTimeWindow) { + println("---- RECONNECT") + timer.newTimeout(new TimerTask() { + def run(timeout: Timeout) = { + client.openChannels.remove(event.getChannel) + log.debug("Remote client reconnecting to [%s]", remoteAddress) + client.connection = bootstrap.connect(remoteAddress) + client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. + if (!client.connection.isSuccess) { + client.foreachListener(l => + l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port)) + log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) + } } - } - }, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) + }, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) + } else client.shutdown } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { client.foreachListener(l => l ! RemoteClientConnected(client.hostname, client.port)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) + client.resetReconnectionTimeWindow } if (RemoteServer.SECURE) { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index e741fccd23..a879da649b 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -120,6 +120,7 @@ akka { client { reconnect-delay = 5 read-timeout = 10 + reconnection-time-window = 600 # the maximum time window that a client should try to reconnect for } }