diff --git a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala index 2b3a6150e4..1495154a27 100644 --- a/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala +++ b/akka-remote/src/main/scala/akka/remote/DefaultFailureDetectorRegistry.scala @@ -4,10 +4,10 @@ package akka.remote -import akka.event.LoggingAdapter import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Map +import java.util.concurrent.locks.{ ReentrantLock, Lock } /** * A lock-less thread-safe implementation of [[akka.remote.FailureDetectorRegistry]]. @@ -18,67 +18,53 @@ import scala.collection.immutable.Map */ class DefaultFailureDetectorRegistry[A](val detectorFactory: () ⇒ FailureDetector) extends FailureDetectorRegistry[A] { - private val table = new AtomicReference[Map[A, FailureDetector]](Map()) + private val resourceToFailureDetector = new AtomicReference[Map[A, FailureDetector]](Map()) + private final val failureDectorCreationLock: Lock = new ReentrantLock /** * Returns true if the resource is considered to be up and healthy and returns false otherwise. For unregistered * resources it returns true. */ - final override def isAvailable(resource: A): Boolean = table.get.get(resource) match { + final override def isAvailable(resource: A): Boolean = resourceToFailureDetector.get.get(resource) match { case Some(r) ⇒ r.isAvailable case _ ⇒ true } - final override def heartbeat(resource: A): Unit = { + @tailrec final override def heartbeat(resource: A): Unit = { - // Second option parameter is there to avoid the unnecessary creation of failure detectors when a CAS loop happens - // Note, _one_ unnecessary detector might be created -- but no more. - @tailrec - def doHeartbeat(resource: A, detector: Option[FailureDetector]): Unit = { - val oldTable = table.get + val oldTable = resourceToFailureDetector.get - oldTable.get(resource) match { - case Some(failureDetector) ⇒ failureDetector.heartbeat() - case None ⇒ - val newDetector = detector getOrElse detectorFactory() - val newTable = oldTable + (resource -> newDetector) - if (!table.compareAndSet(oldTable, newTable)) - doHeartbeat(resource, Some(newDetector)) - else - newDetector.heartbeat() - } + oldTable.get(resource) match { + case Some(failureDetector) ⇒ failureDetector.heartbeat() + case None ⇒ + // First one wins and creates the new FailureDetector + if (failureDectorCreationLock.tryLock()) try { + val newDetector: FailureDetector = detectorFactory() + newDetector.heartbeat() + resourceToFailureDetector.set(oldTable + (resource -> newDetector)) + } finally failureDectorCreationLock.unlock() + else heartbeat(resource) // The thread that lost the race will try to reread } - - doHeartbeat(resource, None) } - final override def remove(resource: A): Unit = { + @tailrec final override def remove(resource: A): Unit = { - @tailrec - def doRemove(resource: A): Unit = { - val oldTable = table.get + val oldTable = resourceToFailureDetector.get - if (oldTable.contains(resource)) { - val newTable = oldTable - resource + if (oldTable.contains(resource)) { + val newTable = oldTable - resource - // if we won the race then update else try again - if (!table.compareAndSet(oldTable, newTable)) doRemove(resource) // recur - } - } - - doRemove(resource) - } - - final override def reset(): Unit = { - - @tailrec - def doReset(): Unit = { - val oldTable = table.get // if we won the race then update else try again - if (!table.compareAndSet(oldTable, Map.empty[A, FailureDetector])) doReset() // recur + if (!resourceToFailureDetector.compareAndSet(oldTable, newTable)) remove(resource) // recur } + } + + @tailrec final override def reset(): Unit = { + + val oldTable = resourceToFailureDetector.get + // if we won the race then update else try again + if (!resourceToFailureDetector.compareAndSet(oldTable, Map.empty[A, FailureDetector])) reset() // recur - doReset() } } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 2fe6b7ed2b..2b45c7be92 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -14,18 +14,20 @@ import akka.serialization.Serialization import akka.util.ByteString import java.net.URLEncoder import scala.util.control.NonFatal -import akka.actor.SupervisorStrategy.{ Restart, Stop } -trait InboundMessageDispatcher { +/** + * Internal API + */ +private[remote] trait InboundMessageDispatcher { def dispatch(recipient: InternalActorRef, recipientAddress: Address, serializedMessage: MessageProtocol, senderOption: Option[ActorRef]): Unit } -class DefaultMessageDispatcher(private val system: ExtendedActorSystem, - private val provider: RemoteActorRefProvider, - private val log: LoggingAdapter) extends InboundMessageDispatcher { +private[remote] class DefaultMessageDispatcher(private val system: ExtendedActorSystem, + private val provider: RemoteActorRefProvider, + private val log: LoggingAdapter) extends InboundMessageDispatcher { private val remoteDaemon = provider.remoteDaemon @@ -37,11 +39,11 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem, import provider.remoteSettings._ lazy val payload: AnyRef = MessageSerializer.deserialize(system, serializedMessage) - lazy val payloadClass: Class[_] = if (payload eq null) null else payload.getClass + def payloadClass: Class[_] = if (payload eq null) null else payload.getClass val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val originalReceiver = recipient.path - lazy val msgLog = s"RemoteMessage: [$payload] to [$recipient]<+[$originalReceiver] from [$sender]" + def msgLog = s"RemoteMessage: [$payload] to [$recipient]<+[$originalReceiver] from [$sender]" recipient match { @@ -83,8 +85,17 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem, } -object EndpointWriter { +/** + * Internal API + */ +private[remote] object EndpointWriter { + /** + * This message signals that the current association maintained by the local EndpointWriter and EndpointReader is + * to be overridden by a new inbound association. This is needed to avoid parallel inbound associations from the + * same remote endpoint. + * @param handle + */ case class TakeOver(handle: AssociationHandle) case object BackoffTimer @@ -94,8 +105,8 @@ object EndpointWriter { case object Writing extends State } -class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause) -case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) +private[remote] class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause) +private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) extends EndpointException("Invalid address: " + remoteAddress, cause) private[remote] class EndpointWriter( @@ -112,37 +123,34 @@ private[remote] class EndpointWriter( val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem] val eventPublisher = new EventPublisher(context.system, log, settings.LogLifecycleEvents) - var reader: ActorRef = null - var handle: AssociationHandle = handleOrActive.getOrElse(null) - var inbound = false - var readerId = 0 + var reader: Option[ActorRef] = None + var handle: Option[AssociationHandle] = handleOrActive // FIXME: refactor into state data + val readerId = Iterator from 0 override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ publishAndThrow(e) } val msgDispatch = new DefaultMessageDispatcher(extendedSystem, extendedSystem.provider.asInstanceOf[RemoteActorRefProvider], log) - private def publishAndThrow(reason: Throwable): Nothing = { - eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound)) - throw reason - } + def inbound = handle.isDefined - private def publishAndThrow(message: String, cause: Throwable): Nothing = - publishAndThrow(new EndpointException(message, cause)) + private def publishAndThrow(reason: Throwable): Nothing = + try + // FIXME: Casting seems very evil here... + eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound)).asInstanceOf[Nothing] + finally throw reason override def postRestart(reason: Throwable): Unit = { - handle = null // Wipe out the possibly injected handle + handle = None // Wipe out the possibly injected handle preStart() } override def preStart(): Unit = { - if (handle eq null) { + if (!inbound) { transport.associate(remoteAddress) pipeTo self - inbound = false startWith(Initializing, ()) } else { startReadEndpoint() - inbound = true startWith(Writing, ()) } } @@ -150,15 +158,16 @@ private[remote] class EndpointWriter( when(Initializing) { case Event(Send(msg, senderOption, recipient), _) ⇒ stash() - stay + stay() case Event(Transport.Invalid(e), _) ⇒ log.error(e, "Tried to associate with invalid remote address [{}]. " + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) - case Event(Transport.Fail(e), _) ⇒ publishAndThrow(s"Association failed with [$remoteAddress]", e) + case Event(Transport.Fail(e), _) ⇒ + publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e)) case Event(Transport.Ready(inboundHandle), _) ⇒ - handle = inboundHandle + handle = Some(inboundHandle) startReadEndpoint() goto(Writing) @@ -167,7 +176,7 @@ private[remote] class EndpointWriter( when(Buffering) { case Event(Send(msg, senderOption, recipient), _) ⇒ stash() - stay + stay() case Event(BackoffTimer, _) ⇒ goto(Writing) } @@ -175,26 +184,31 @@ private[remote] class EndpointWriter( when(Writing) { case Event(Send(msg, senderOption, recipient), _) ⇒ val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption) - val success = try handle.write(pdu) catch { - case NonFatal(e) ⇒ publishAndThrow("Failed to write message to the transport", e) + val success = try handle match { + case Some(h) ⇒ h.write(pdu) + case None ⇒ throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" + + "handle is present.", null) + } catch { + case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e)) } - if (success) stay else { + if (success) stay() else { stash() goto(Buffering) } } whenUnhandled { - case Event(Terminated(r), _) if r == reader ⇒ publishAndThrow("Disassociated", null) + case Event(Terminated(r), _) if r == reader ⇒ publishAndThrow(new EndpointException("Disassociated", null)) case Event(TakeOver(newHandle), _) ⇒ // Shutdown old reader - if (handle ne null) handle.disassociate() - if (reader ne null) { - context.unwatch(reader) - context.stop(reader) + handle foreach { _.disassociate() } + reader match { + case Some(r) ⇒ + context.unwatch(r) + context.stop(r) + case None ⇒ } - handle = newHandle - inbound = true + handle = Some(newHandle) startReadEndpoint() unstashAll() goto(Writing) @@ -204,7 +218,8 @@ private[remote] class EndpointWriter( case Initializing -> Writing ⇒ unstashAll() eventPublisher.notifyListeners(AssociatedEvent(localAddress, remoteAddress, inbound)) - case Writing -> Buffering ⇒ setTimer("backoff-timer", BackoffTimer, settings.BackoffPeriod, false) + case Writing -> Buffering ⇒ + setTimer("backoff-timer", BackoffTimer, settings.BackoffPeriod, repeat = false) case Buffering -> Writing ⇒ unstashAll() cancelTimer("backoff-timer") @@ -212,24 +227,31 @@ private[remote] class EndpointWriter( onTermination { case StopEvent(_, _, _) ⇒ if (handle ne null) { + // FIXME: Add a test case for this + // It is important to call unstashAll() for the stash to work properly and maintain messages during restart. + // As the FSM trait does not call super.postStop(), this call is needed unstashAll() - handle.disassociate() + handle foreach { _.disassociate() } eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound)) } } - private def startReadEndpoint(): Unit = { - reader = context.actorOf(Props(new EndpointReader(codec, handle.localAddress, msgDispatch)), - "endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8")) - readerId += 1 - handle.readHandlerPromise.success(reader) - context.watch(reader) + private def startReadEndpoint(): Unit = handle match { + case Some(h) ⇒ + reader = Some(context.watch(context.actorOf(Props(new EndpointReader(codec, h.localAddress, msgDispatch)), + "endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + readerId.next()))) + h.readHandlerPromise.success(reader.get) + case None ⇒ throw new EndpointException("Internal error: No handle was present during creation of the endpoint" + + "reader.", null) } - private def serializeMessage(msg: Any): MessageProtocol = { - Serialization.currentTransportAddress.withValue(handle.localAddress) { - (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])) - } + private def serializeMessage(msg: Any): MessageProtocol = handle match { + case Some(h) ⇒ + Serialization.currentTransportAddress.withValue(h.localAddress) { + (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])) + } + case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of" + + "outbound message.", null) } } diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index d51c2b90ba..1c12bec87f 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -4,6 +4,7 @@ import akka.remote.FailureDetector.Clock import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.concurrent.duration.FiniteDuration +import scala.collection.immutable /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: @@ -68,11 +69,9 @@ class PhiAccrualFailureDetector( * Implement using optimistic lockless concurrency, all state is represented * by this immutable case class and managed by an AtomicReference. */ - private case class State( - history: HeartbeatHistory = firstHeartbeat, - timestamp: Option[Long] = None) + private case class State(history: HeartbeatHistory, timestamp: Option[Long]) - private val state = new AtomicReference[State](State()) + private val state = new AtomicReference[State](State(history = firstHeartbeat, timestamp = None)) override def isAvailable: Boolean = phi < threshold @@ -123,10 +122,8 @@ class PhiAccrualFailureDetector( } } - private[akka] def phi(timeDiff: Long, mean: Double, stdDeviation: Double): Double = { - val cdf = cumulativeDistributionFunction(timeDiff, mean, stdDeviation) - -math.log10(1.0 - cdf) - } + private[akka] def phi(timeDiff: Long, mean: Double, stdDeviation: Double): Double = + -math.log10(1.0 - cumulativeDistributionFunction(timeDiff, mean, stdDeviation)) private val minStdDeviationMillis = minStdDeviation.toMillis @@ -154,7 +151,7 @@ private[akka] object HeartbeatHistory { */ def apply(maxSampleSize: Int): HeartbeatHistory = HeartbeatHistory( maxSampleSize = maxSampleSize, - intervals = IndexedSeq.empty, + intervals = immutable.IndexedSeq.empty, intervalSum = 0L, squaredIntervalSum = 0L) @@ -169,12 +166,14 @@ private[akka] object HeartbeatHistory { */ private[akka] case class HeartbeatHistory private ( maxSampleSize: Int, - intervals: IndexedSeq[Long], + intervals: immutable.IndexedSeq[Long], intervalSum: Long, squaredIntervalSum: Long) { if (maxSampleSize < 1) throw new IllegalArgumentException(s"maxSampleSize must be >= 1, got [$maxSampleSize]") + if (intervals.size == 0) + throw new IllegalArgumentException("intervals.size must be > 0") if (intervalSum < 0L) throw new IllegalArgumentException(s"intervalSum must be >= 0, got [$intervalSum]") if (squaredIntervalSum < 0L) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index e2577f3804..ad6fb98ff9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -161,7 +161,7 @@ class RemoteActorRefProvider( Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match { case d @ Deploy(_, _, _, RemoteScope(addr)) ⇒ - if (isSelfAddress(addr)) { + if (hasAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else { try { @@ -181,7 +181,7 @@ class RemoteActorRefProvider( } def actorFor(path: ActorPath): InternalActorRef = { - if (isSelfAddress(path.address)) actorFor(rootGuardian, path.elements) + if (hasAddress(path.address)) actorFor(rootGuardian, path.elements) else try { new RemoteActorRef(this, transport, transport.localAddressForRemote(path.address), path, Nobody, props = None, deploy = None) @@ -194,7 +194,7 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ - if (isSelfAddress(address)) actorFor(rootGuardian, elems) + if (hasAddress(address)) actorFor(rootGuardian, elems) else new RemoteActorRef(this, transport, transport.localAddressForRemote(address), new RootActorPath(address) / elems, Nobody, props = None, deploy = None) case _ ⇒ local.actorFor(ref, path) @@ -207,7 +207,7 @@ class RemoteActorRefProvider( */ def actorForWithLocalAddress(ref: InternalActorRef, path: String, localAddress: Address): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ - if (isSelfAddress(address)) actorFor(rootGuardian, elems) + if (hasAddress(address)) actorFor(rootGuardian, elems) else new RemoteActorRef(this, transport, localAddress, new RootActorPath(address) / elems, Nobody, props = None, deploy = None) case _ ⇒ local.actorFor(ref, path) @@ -227,13 +227,13 @@ class RemoteActorRefProvider( def getExternalAddressFor(addr: Address): Option[Address] = { addr match { - case _ if isSelfAddress(addr) ⇒ Some(local.rootPath.address) + case _ if hasAddress(addr) ⇒ Some(local.rootPath.address) case Address("akka", _, Some(_), Some(_)) ⇒ Some(transport.localAddressForRemote(addr)) case _ ⇒ None } } - private def isSelfAddress(address: Address): Boolean = + private def hasAddress(address: Address): Boolean = address == local.rootPath.address || address == rootPath.address || transport.addresses(address) } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 7270e2d1bf..d26bec609e 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -160,9 +160,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc _.toSet } - endpointManager ! StartupFinished - addresses = transports.map { _._2 }.toSet + + endpointManager ! StartupFinished eventPublisher.notifyListeners(RemotingListenEvent(addresses)) } catch { @@ -186,9 +186,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc // Ignore } - override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { + // FIXME: Keep senders down the stack + override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager.tell(Send(message, senderOption, recipient), sender = Actor.noSender) - } override def managementCommand(cmd: Any): Future[Boolean] = { val statusPromise = Promise[Boolean]() @@ -371,7 +371,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.registerPassiveEndpoint(handle.remoteAddress, endpoint) else handle.disassociate() } - case Terminated(endpoint) ⇒ endpoints.removeIfNotGated(endpoint); + case Terminated(endpoint) ⇒ endpoints.removeIfNotGated(endpoint) case Prune ⇒ endpoints.prune(settings.RetryGateClosedFor) } @@ -397,7 +397,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec) - } val listens: Future[Seq[(Transport, (Address, Promise[AssociationEventListener]))]] = Future.sequence( @@ -426,9 +425,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends private def createEndpoint(remoteAddress: Address, localAddress: Address, handleOption: Option[AssociationHandle]): ActorRef = { - assert(transportMapping contains (localAddress)) + assert(transportMapping contains localAddress) - val endpoint = context.actorOf(Props( + context.watch(context.actorOf(Props( new EndpointWriter( handleOption, localAddress, @@ -437,9 +436,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends settings, AkkaPduProtobufCodec)) .withDispatcher("akka.remoting.writer-dispatcher"), - "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next()) - - context.watch(endpoint) + "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next())) } @@ -450,7 +447,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transportMapping.values foreach { transport ⇒ try transport.shutdown() catch { case NonFatal(e) ⇒ - log.error(e, s"Unable to shut down the underlying Transport: [$transport]") + log.error(e, s"Unable to shut down the underlying transport: [$transport]") } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala index 40eb1a03a4..9cb6d40e39 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala @@ -6,11 +6,11 @@ import scala.beans.BeanProperty import java.util.{ Set ⇒ JSet } import scala.collection.JavaConverters.setAsJavaSetConverter -trait RemotingLifecycleEvent extends Serializable { +sealed trait RemotingLifecycleEvent extends Serializable { def logLevel: Logging.LogLevel } -trait AssociationEvent extends RemotingLifecycleEvent { +sealed trait AssociationEvent extends RemotingLifecycleEvent { def localAddress: Address def remoteAddress: Address def inbound: Boolean @@ -21,7 +21,7 @@ trait AssociationEvent extends RemotingLifecycleEvent { override def toString: String = s"$eventName [$localAddress]${if (inbound) " <- " else " -> "}[$remoteAddress]" } -case class AssociatedEvent( +final case class AssociatedEvent( localAddress: Address, remoteAddress: Address, inbound: Boolean) @@ -32,7 +32,7 @@ case class AssociatedEvent( } -case class DisassociatedEvent( +final case class DisassociatedEvent( localAddress: Address, remoteAddress: Address, inbound: Boolean) @@ -41,7 +41,7 @@ case class DisassociatedEvent( override def logLevel: Logging.LogLevel = Logging.DebugLevel } -case class AssociationErrorEvent( +final case class AssociationErrorEvent( cause: Throwable, localAddress: Address, remoteAddress: Address, @@ -52,8 +52,8 @@ case class AssociationErrorEvent( def getCause: Throwable = cause } -case class RemotingListenEvent(listenAddresses: Set[Address]) extends RemotingLifecycleEvent { - final def getListenAddresses: JSet[Address] = listenAddresses.asJava +final case class RemotingListenEvent(listenAddresses: Set[Address]) extends RemotingLifecycleEvent { + def getListenAddresses: JSet[Address] = listenAddresses.asJava override def logLevel: Logging.LogLevel = Logging.InfoLevel override def toString: String = "Remoting now listens on addresses: " + listenAddresses.mkString("[", ", ", "]") } @@ -63,7 +63,7 @@ case object RemotingShutdownEvent extends RemotingLifecycleEvent { override val toString: String = "Remoting shut down" } -case class RemotingErrorEvent(@BeanProperty cause: Throwable) extends RemotingLifecycleEvent { +final case class RemotingErrorEvent(@BeanProperty cause: Throwable) extends RemotingLifecycleEvent { override def logLevel: Logging.LogLevel = Logging.ErrorLevel override def toString: String = s"Remoting error: [${Logging.stackTraceFor(cause)}]" } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index ded169dec2..99cdd99fcc 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -86,14 +86,9 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { override def decodePdu(raw: ByteString): AkkaPdu = { try { val pdu = AkkaRemoteProtocol.parseFrom(raw.toArray) - - if (pdu.hasPayload) { - Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) - } else if (pdu.hasInstruction) { - decodeControlPdu(pdu.getInstruction) - } else { - throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) - } + if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) + else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction) + else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) } catch { case e: InvalidProtocolBufferException ⇒ throw new PduCodecException("Decoding PDU failed.", e) } @@ -108,9 +103,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { recipient = provider.actorForWithLocalAddress(provider.rootGuardian, msgPdu.getRecipient.getPath, localAddress), recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath), serializedMessage = msgPdu.getMessage, - senderOption = (if (msgPdu.hasSender) - Some(provider.actorForWithLocalAddress(provider.rootGuardian, msgPdu.getSender.getPath, localAddress)) - else None)) + senderOption = if (!msgPdu.hasSender) None + else Some(provider.actorForWithLocalAddress(provider.rootGuardian, msgPdu.getSender.getPath, localAddress))) } private def decodeControlPdu(controlPdu: RemoteControlProtocol): AkkaPdu = { @@ -135,7 +129,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { val controlMessageBuilder = RemoteControlProtocol.newBuilder() controlMessageBuilder.setCommandType(code) - cookie foreach { controlMessageBuilder.setCookie(_) } + cookie foreach controlMessageBuilder.setCookie for (originAddress ← origin; serialized ← serializeAddress(originAddress)) controlMessageBuilder.setOrigin(serialized) @@ -143,12 +137,9 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { } private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefProtocol = { - val fullActorRefString: String = if (ref.path.address.host.isDefined) - ref.path.toString - else - ref.path.toStringWithAddress(defaultAddress) - - ActorRefProtocol.newBuilder.setPath(fullActorRefString).build() + ActorRefProtocol.newBuilder.setPath( + if(ref.path.address.host.isDefined) ref.path.toString else ref.path.toStringWithAddress(defaultAddress) + ).build() } private def serializeAddress(address: Address): Option[AddressProtocol] = { diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 9d1620cf43..1517c495db 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -17,7 +17,7 @@ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal import scala.util.{ Success, Failure } import java.net.URLEncoder -import scala.collection.immutable.Queue +import scala.collection.immutable import akka.remote.transport.ActorTransportAdapter._ class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) @@ -223,7 +223,7 @@ private[transport] object ProtocolStateActor { // Both transports are associated, but the handler for the handle has not yet been provided case class AssociatedWaitHandler(handleListener: Future[HandleEventListener], wrappedHandle: AssociationHandle, - queue: Queue[ByteString]) + queue: immutable.Queue[ByteString]) extends ProtocolStateData case class ListenerReady(listener: HandleEventListener, wrappedHandle: AssociationHandle) @@ -316,16 +316,16 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Payload(payload) ⇒ sendHeartbeat(wrappedHandle) goto(Open) using - AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue(payload)) + AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue(payload)) case Heartbeat ⇒ sendHeartbeat(wrappedHandle) failureDetector.heartbeat() goto(Open) using - AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) + AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue.empty) case _ ⇒ goto(Open) using - AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) + AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, immutable.Queue.empty) } case Event(HeartbeatTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) ⇒ handleTimers(wrappedHandle) @@ -343,7 +343,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat failureDetector.heartbeat() initTimers() - goto(Open) using AssociatedWaitHandler(notifyInboundHandler(wrappedHandle, origin, associationHandler), wrappedHandle, Queue.empty) + goto(Open) using AssociatedWaitHandler(notifyInboundHandler(wrappedHandle, origin, associationHandler), wrappedHandle, immutable.Queue.empty) } else { stop() } @@ -431,9 +431,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case StopEvent(_, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒ // Invalidate exposed but still unfinished promise. The underlying association disappeared, so after // registration immediately signal a disassociate - handlerFuture.onSuccess { - case listener: HandleEventListener ⇒ listener notify Disassociated - } + handlerFuture foreach { _ notify Disassociated } case StopEvent(_, _, ListenerReady(handler, wrappedHandle)) ⇒ handler notify Disassociated diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 8ade317dd5..7a5798069b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -1,11 +1,11 @@ package akka.remote.transport -import concurrent.{ Promise, Future } +import scala.concurrent.{ Promise, Future } import akka.actor.{ ActorRef, Address } import akka.util.ByteString -import akka.remote.transport.Transport.AssociationEvent import akka.remote.transport.AssociationHandle.HandleEventListener + object Transport { trait AssociationEvent diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index 0c768e9902..7d826c3776 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -21,7 +21,7 @@ private[netty] trait NettyHelpers { val cause = if (ev.getCause ne null) ev.getCause else new AkkaException("Unknown cause") cause match { case _: ClosedChannelException ⇒ // Ignore - case NonFatal(e) ⇒ onException(ctx, ev) + case null | NonFatal(e) ⇒ onException(ctx, ev) case e: Throwable ⇒ throw e // Rethrow fatals } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 54497a447b..80cfd1e099 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -150,6 +150,7 @@ abstract class ClientHandler(protected final val transport: NettyTransport, } private[transport] object NettyTransport { + // 4 bytes will be used to represent the frame length. Used by netty LengthFieldPrepender downstream handler. val FrameLengthFieldLength = 4 def gracefulClose(channel: Channel): Unit = channel.disconnect().addListener(ChannelFutureListener.CLOSE)