From d46a4b6916728eb711fb4088a87fc4ba6f999ed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 11 Dec 2012 13:08:36 +0100 Subject: [PATCH] - Removed usage of null from Endpoint - Fixed race resolution logic in FailureDetectorRegistry to not create unnecessary instances --- .../DefaultFailureDetectorRegistry.scala | 18 +++++++---- .../src/main/scala/akka/remote/Endpoint.scala | 17 +++++++---- .../scala/akka/remote/MessageSerializer.scala | 2 +- .../remote/PhiAccrualFailureDetector.scala | 4 +-- .../akka/remote/RemoteActorRefProvider.scala | 2 +- .../src/main/scala/akka/remote/Remoting.scala | 30 ++++++++++++------- .../transport/AbstractTransportAdapter.scala | 4 +-- .../akka/remote/transport/AkkaPduCodec.scala | 3 +- .../transport/AkkaProtocolTransport.scala | 2 +- .../akka/remote/transport/Transport.scala | 1 - .../remote/transport/netty/NettyHelpers.scala | 2 +- 11 files changed, 52 insertions(+), 33 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala index 1495154a27..63c51ef2b8 100644 --- a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala +++ b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala @@ -32,16 +32,22 @@ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetec @tailrec final override def heartbeat(resource: A): Unit = { - val oldTable = resourceToFailureDetector.get - - oldTable.get(resource) match { + resourceToFailureDetector.get.get(resource) match { case Some(failureDetector) ⇒ failureDetector.heartbeat() case None ⇒ // First one wins and creates the new FailureDetector if (failureDectorCreationLock.tryLock()) try { - val newDetector: FailureDetector = detectorFactory() - newDetector.heartbeat() - resourceToFailureDetector.set(oldTable + (resource -> newDetector)) + // First check for non-existing key was outside the lock, and a second thread might just released the lock + // when this one acquired it, so the second check is needed. + val oldTable = resourceToFailureDetector.get + oldTable.get(resource) match { + case Some(failureDetector) ⇒ + failureDetector.heartbeat() + case None ⇒ + val newDetector: FailureDetector = detectorFactory() + newDetector.heartbeat() + resourceToFailureDetector.set(oldTable + (resource -> newDetector)) + } } finally failureDectorCreationLock.unlock() else heartbeat(resource) // The thread that lost the race will try to reread } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 2b45c7be92..dd1a5074a3 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -93,8 +93,9 @@ private[remote] object EndpointWriter { /** * This message signals that the current association maintained by the local EndpointWriter and EndpointReader is * to be overridden by a new inbound association. This is needed to avoid parallel inbound associations from the - * same remote endpoint. - * @param handle + * same remote endpoint: when a parallel inbound association is detected, the old one is removed and the new one is + * used instead. + * @param handle Handle of the new inbound association. */ case class TakeOver(handle: AssociationHandle) case object BackoffTimer @@ -130,7 +131,7 @@ private[remote] class EndpointWriter( override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ publishAndThrow(e) } val msgDispatch = - new DefaultMessageDispatcher(extendedSystem, extendedSystem.provider.asInstanceOf[RemoteActorRefProvider], log) + new DefaultMessageDispatcher(extendedSystem, RARP(extendedSystem).provider, log) def inbound = handle.isDefined @@ -238,8 +239,12 @@ private[remote] class EndpointWriter( private def startReadEndpoint(): Unit = handle match { case Some(h) ⇒ - reader = Some(context.watch(context.actorOf(Props(new EndpointReader(codec, h.localAddress, msgDispatch)), - "endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + readerId.next()))) + val readerLocalAddress = h.localAddress + val readerCodec = codec + val readerDispatcher = msgDispatch + reader = Some( + context.watch(context.actorOf(Props(new EndpointReader(readerCodec, readerLocalAddress, readerDispatcher)), + "endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + readerId.next()))) h.readHandlerPromise.success(reader.get) case None ⇒ throw new EndpointException("Internal error: No handle was present during creation of the endpoint" + "reader.", null) @@ -261,7 +266,7 @@ private[remote] class EndpointReader( val localAddress: Address, val msgDispatch: InboundMessageDispatcher) extends Actor { - val provider = context.system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] + val provider = RARP(context.system).provider override def receive: Receive = { case Disassociated ⇒ context.stop(self) diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index b459a48c98..f028757ff7 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -12,7 +12,7 @@ import akka.actor.ExtendedActorSystem import akka.serialization.SerializationExtension /** - * MessageSerializer is a helper for serialize and deserialize messages + * MessageSerializer is a helper for serializing and deserialize messages */ private[akka] object MessageSerializer { diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index 1c12bec87f..b0525b48d8 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -170,10 +170,10 @@ private[akka] case class HeartbeatHistory private ( intervalSum: Long, squaredIntervalSum: Long) { + // Heartbeat histories are created trough the firstHeartbeat variable of the PhiAccrualFailureDetector + // which always have intervals.size > 0. if (maxSampleSize < 1) throw new IllegalArgumentException(s"maxSampleSize must be >= 1, got [$maxSampleSize]") - if (intervals.size == 0) - throw new IllegalArgumentException("intervals.size must be > 0") if (intervalSum < 0L) throw new IllegalArgumentException(s"intervalSum must be >= 0, got [$intervalSum]") if (squaredIntervalSum < 0L) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index ad6fb98ff9..3e2f25f3d5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -227,7 +227,7 @@ class RemoteActorRefProvider( def getExternalAddressFor(addr: Address): Option[Address] = { addr match { - case _ if hasAddress(addr) ⇒ Some(local.rootPath.address) + case _ if hasAddress(addr) ⇒ Some(local.rootPath.address) case Address("akka", _, Some(_), Some(_)) ⇒ Some(transport.localAddressForRemote(addr)) case _ ⇒ None } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index d26bec609e..6cbea785e6 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -19,7 +19,7 @@ import java.util.concurrent.TimeoutException import scala.util.{ Failure, Success } import scala.collection.immutable import akka.japi.Util.immutableSeq -import akka.remote.Remoting.RegisterTransportActor +import akka.remote.Remoting.{ TransportSupervisor, RegisterTransportActor } class RemotingSettings(val config: Config) { @@ -60,6 +60,14 @@ class RemotingSettings(val config: Config) { cfg.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } } +private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension +private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider { + + override def lookup() = RARP + + override def createExtension(system: ExtendedActorSystem) = RARP(system.provider.asInstanceOf[RemoteActorRefProvider]) +} + private[remote] object Remoting { final val EndpointManagerName = "endpointManager" @@ -94,6 +102,16 @@ private[remote] object Remoting { case class RegisterTransportActor(props: Props, name: String) + private[Remoting] class TransportSupervisor extends Actor { + override def supervisorStrategy = OneForOneStrategy() { + case NonFatal(e) ⇒ Restart + } + + def receive = { + case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props, name) + } + } + } private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { @@ -106,15 +124,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc private val settings = new RemotingSettings(provider.remoteSettings.config) - val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { - override def supervisorStrategy = OneForOneStrategy() { - case NonFatal(e) ⇒ Restart - } - - def receive = { - case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props, name) - } - }), "transports") + val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[TransportSupervisor], "transports") override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote) diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 17105a1df5..271d86f2ea 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -4,7 +4,7 @@ import scala.language.postfixOps import akka.actor._ import akka.pattern.ask import akka.remote.transport.Transport._ -import akka.remote.{ RemotingSettings, RemoteActorRefProvider } +import akka.remote.{ RARP, RemotingSettings, RemoteActorRefProvider } import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Promise, Future } import scala.util.Success @@ -16,7 +16,7 @@ import scala.concurrent.duration._ trait TransportAdapterProvider extends ((Transport, ExtendedActorSystem) ⇒ Transport) class TransportAdapters(system: ExtendedActorSystem) extends Extension { - val settings = new RemotingSettings(system.provider.asInstanceOf[RemoteActorRefProvider].remoteSettings.config) + val settings = new RemotingSettings(RARP(system).provider.remoteSettings.config) private val adaptersTable: Map[String, TransportAdapterProvider] = for ((name, fqn) ← settings.Adapters) yield { name -> system.dynamicAccess.createInstanceFor[TransportAdapterProvider](fqn, immutable.Seq.empty).recover({ diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index 99cdd99fcc..9bbc2d73ce 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -138,8 +138,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefProtocol = { ActorRefProtocol.newBuilder.setPath( - if(ref.path.address.host.isDefined) ref.path.toString else ref.path.toStringWithAddress(defaultAddress) - ).build() + if (ref.path.address.host.isDefined) ref.path.toString else ref.path.toStringWithAddress(defaultAddress)).build() } private def serializeAddress(address: Address): Option[AddressProtocol] = { diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 1517c495db..aab2722cc6 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -293,7 +293,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat if (settings.WaitActivityEnabled) goto(WaitActivity) using OutboundUnderlyingAssociated(statusPromise, wrappedHandle) else - goto(Open) using AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) + goto(Open) using AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue.empty) case Event(DisassociateUnderlying, _) ⇒ stop() diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 7a5798069b..4cdc2f47db 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -5,7 +5,6 @@ import akka.actor.{ ActorRef, Address } import akka.util.ByteString import akka.remote.transport.AssociationHandle.HandleEventListener - object Transport { trait AssociationEvent diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index 7d826c3776..dec126cd6a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -21,7 +21,7 @@ private[netty] trait NettyHelpers { val cause = if (ev.getCause ne null) ev.getCause else new AkkaException("Unknown cause") cause match { case _: ClosedChannelException ⇒ // Ignore - case null | NonFatal(e) ⇒ onException(ctx, ev) + case null | NonFatal(_) ⇒ onException(ctx, ev) case e: Throwable ⇒ throw e // Rethrow fatals } }