From 78c0b8164c074784c6341e217637104e4bd34e82 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 7 Oct 2010 12:15:59 +0200 Subject: [PATCH] Fixing UUID remote request bug --- akka-actor/src/main/scala/util/LockUtil.scala | 58 +++++++++++++++ .../src/main/scala/remote/RemoteClient.scala | 72 +++++++++---------- .../serialization/SerializationProtocol.scala | 4 +- .../ClientInitiatedRemoteActorSpec.scala | 13 ++++ 4 files changed, 107 insertions(+), 40 deletions(-) diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 3d1261e468..6df0695f03 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -111,4 +111,62 @@ class SimpleLock { def unlock() { acquired.set(false) } +} + +/** + * An atomic switch that can be either on or off + */ +class Switch(startAsOn: Boolean = false) { + private val switch = new AtomicBoolean(startAsOn) + + protected def transcend(from: Boolean,action: => Unit): Boolean = { + if (switch.compareAndSet(from,!from)) { + try { + action + } catch { + case t => + switch.compareAndSet(!from,from) //Revert status + throw t + } + true + } else false + } + + def switchOff(action: => Unit): Boolean = transcend(from = true, action) + def switchOn(action: => Unit): Boolean = transcend(from = false,action) + + def ifOnYield[T](action: => T): Option[T] = { + if (switch.get) + Some(action) + else + None + } + + def ifOffYield[T](action: => T): Option[T] = { + if (switch.get) + Some(action) + else + None + } + + def ifOn(action: => Unit): Boolean = { + if (switch.get) { + action + true + } + else + false + } + + def ifOff(action: => Unit): Boolean = { + if (!switch.get) { + action + true + } + else + false + } + + def isOn = switch.get + def isOff = !isOn } \ No newline at end of file diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index 2694c9aee8..e39b83a503 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} import scala.reflect.BeanProperty import se.scalablesolutions.akka.actor._ -import se.scalablesolutions.akka.util.{Address, ListenerManagement, Logging, Duration} +import se.scalablesolutions.akka.util._ /** * Life-cycle events for RemoteClient. @@ -200,56 +200,52 @@ class RemoteClient private[akka] ( private val remoteAddress = new InetSocketAddress(hostname, port) //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) - @volatile private[remote] var isRunning = false @volatile private var bootstrap: ClientBootstrap = _ @volatile private[remote] var connection: ChannelFuture = _ @volatile private[remote] var openChannels: DefaultChannelGroup = _ @volatile private var timer: HashedWheelTimer = _ + private[remote] val runSwitch = new Switch() + + private[remote] def isRunning = runSwitch.isOn private val reconnectionTimeWindow = Duration(config.getInt( "akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis @volatile private var reconnectionTimeWindowStart = 0L - def connect = synchronized { - if (!isRunning) { - openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName) - timer = new HashedWheelTimer - bootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool,Executors.newCachedThreadPool - ) + def connect = runSwitch switchOn { + openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName) + timer = new HashedWheelTimer + bootstrap = new ClientBootstrap( + new NioClientSocketChannelFactory( + Executors.newCachedThreadPool,Executors.newCachedThreadPool ) - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("keepAlive", true) - connection = bootstrap.connect(remoteAddress) - log.info("Starting remote client connection to [%s:%s]", hostname, port) - // Wait until the connection attempt succeeds or fails. - val channel = connection.awaitUninterruptibly.getChannel - openChannels.add(channel) - if (!connection.isSuccess) { - notifyListeners(RemoteClientError(connection.getCause, this)) - log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) - } - notifyListeners(RemoteClientStarted(this)) - isRunning = true + ) + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("keepAlive", true) + connection = bootstrap.connect(remoteAddress) + log.info("Starting remote client connection to [%s:%s]", hostname, port) + // Wait until the connection attempt succeeds or fails. + val channel = connection.awaitUninterruptibly.getChannel + openChannels.add(channel) + if (!connection.isSuccess) { + notifyListeners(RemoteClientError(connection.getCause, this)) + log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } + notifyListeners(RemoteClientStarted(this)) } - def shutdown = synchronized { + def shutdown = runSwitch switchOff { log.info("Shutting down %s", name) - if (isRunning) { - isRunning = false - notifyListeners(RemoteClientShutdown(this)) - timer.stop - timer = null - openChannels.close.awaitUninterruptibly - openChannels = null - bootstrap.releaseExternalResources - bootstrap = null - connection = null - log.info("%s has been shut down", name) - } + notifyListeners(RemoteClientShutdown(this)) + timer.stop + timer = null + openChannels.close.awaitUninterruptibly + openChannels = null + bootstrap.releaseExternalResources + bootstrap = null + connection = null + log.info("%s has been shut down", name) } @deprecated("Use addListener instead") @@ -423,7 +419,7 @@ class RemoteClientHandler( } } - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = if (client.isRunning) { + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn { if (client.isWithinReconnectionTimeWindow) { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = { diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 427c3ad721..847911c43f 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -285,9 +285,9 @@ object RemoteActorSerialization { case ActorType.TypedActor => actorInfoBuilder.setActorType(TYPED_ACTOR) } val actorInfo = actorInfoBuilder.build - + val requestUuid = newUuid val requestBuilder = RemoteRequestProtocol.newBuilder - .setUuid(UuidProtocol.newBuilder.setHigh(uuid.getTime).setLow(uuid.getClockSeqAndNode).build) + .setUuid(UuidProtocol.newBuilder.setHigh(requestUuid.getTime).setLow(requestUuid.getClockSeqAndNode).build) .setMessage(MessageSerializer.serialize(message)) .setActorInfo(actorInfo) .setIsOneWay(isOneWay) diff --git a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index d39b58d41d..284ba0b87b 100644 --- a/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -123,6 +123,19 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite { actor.stop } + @Test + def shouldSendBangBangMessageAndReceiveReplyConcurrently = { + val actors = (1 to 10). + map(num => { + val a = actorOf[RemoteActorSpecActorBidirectional] + a.makeRemote(HOSTNAME, PORT1) + a.start + }).toList + actors.map(_ !!! "Hello"). + foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get)) + actors.foreach(_.stop) + } + @Test def shouldSendAndReceiveRemoteException { implicit val timeout = 500000000L