diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 51a4174346..69fa271b9a 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -95,7 +95,7 @@ akka { startup-timeout = 5 s # FIXME document - retry-latch-closed-for = 0 s + retry-gate-closed-for = 0 s # FIXME document retry-window = 3 s diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 6cf2f0c88a..68c06e96ea 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -41,7 +41,7 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem, val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val originalReceiver = recipient.path - lazy val msgLog = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender + lazy val msgLog = s"RemoteMessage: [$payload] to [$recipient]<+[$originalReceiver] from [$sender]" recipient match { @@ -115,11 +115,7 @@ private[remote] class EndpointWriter( var inbound = false var readerId = 0 - override val supervisorStrategy = OneForOneStrategy() { - case NonFatal(e) ⇒ - publishAndThrow(e) - Stop - } + override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ publishAndThrow(e) } val msgDispatch = new DefaultMessageDispatcher(extendedSystem, extendedSystem.provider.asInstanceOf[RemoteActorRefProvider], log) @@ -154,11 +150,11 @@ private[remote] class EndpointWriter( stash() stay case Event(Transport.Invalid(e), _) ⇒ - log.error(e, "Tried to associate with invalid remote address " + remoteAddress + - ". Address is now quarantined, all messages to this address will be delivered to dead letters.") + log.error(e, "Tried to associate with invalid remote address [{}]. " + + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) - case Event(Transport.Fail(e), _) ⇒ publishAndThrow(s"Association failed with $remoteAddress", e) + case Event(Transport.Fail(e), _) ⇒ publishAndThrow(s"Association failed with [$remoteAddress]", e) case Event(Transport.Ready(inboundHandle), _) ⇒ handle = inboundHandle startReadEndpoint() diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 94aeb65349..8f1c440157 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -166,7 +166,6 @@ class RemoteActorRefProvider( } else { val localAddress = transport.localAddressForRemote(addr) val rpath = RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements - useActorOnNode(rpath, props, d, supervisor) new RemoteActorRef(this, transport, localAddress, rpath, supervisor, Some(props), Some(d)) } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 84733431a3..e5b050b223 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -4,8 +4,7 @@ import akka.actor.SupervisorStrategy._ import akka.actor._ import akka.event.{ Logging, LoggingAdapter } import akka.pattern.gracefulStop -import akka.remote.EndpointManager.Listen -import akka.remote.EndpointManager.Send +import akka.remote.EndpointManager.{ StartupFinished, Listen, Send } import akka.remote.transport.Transport.InboundAssociation import akka.remote.transport._ import akka.util.Timeout @@ -31,7 +30,7 @@ class RemotingSettings(config: Config) { val StartupTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS) - val RetryLatchClosedFor: Long = getMilliseconds("akka.remoting.retry-latch-closed-for") + val RetryGateClosedFor: Long = getMilliseconds("akka.remoting.retry-gate-closed-for") val UsePassiveConnections: Boolean = getBoolean("akka.remoting.use-passive-connections") @@ -50,18 +49,18 @@ class RemotingSettings(config: Config) { private[remote] object Remoting { - val EndpointManagerName = "remoteTransportHeadActor" + final val EndpointManagerName = "endpointManager" def localAddressForRemote(transportMapping: Map[String, Set[(Transport, Address)]], remote: Address): Address = { transportMapping.get(remote.protocol) match { case Some(transports) ⇒ - val responsibleTransports = transports.filter(_._1.isResponsibleFor(remote)) + val responsibleTransports = transports.filter { case (t, _) ⇒ t.isResponsibleFor(remote) } responsibleTransports.size match { case 0 ⇒ throw new RemoteTransportException( - s"No transport is responsible for address: ${remote} although protocol ${remote.protocol} is available." + + s"No transport is responsible for address: [${remote}] although protocol [${remote.protocol}] is available." + " Make sure at least one transport is configured to be responsible for the address.", null) @@ -70,7 +69,7 @@ private[remote] object Remoting { case _ ⇒ throw new RemoteTransportException( - s"Multiple transports are available for ${remote}: ${responsibleTransports.mkString(",")}. " + + s"Multiple transports are available for [${remote}]: [${responsibleTransports.mkString(",")}]. " + "Remoting cannot decide which transport to use to reach the remote system. Change your configuration " + "so that only one transport is responsible for the address.", null) @@ -135,6 +134,8 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc _.toSet } + endpointManager ! StartupFinished + addresses = transports.map { _._2 }.toSet eventPublisher.notifyListeners(RemotingListenEvent(addresses)) @@ -175,6 +176,7 @@ private[remote] object EndpointManager { sealed trait RemotingCommand case class Listen(addressesPromise: Promise[Set[(Transport, Address)]]) extends RemotingCommand + case object StartupFinished extends RemotingCommand case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand { override def toString = s"Remote message $senderOption -> $recipient" @@ -182,7 +184,7 @@ private[remote] object EndpointManager { sealed trait EndpointPolicy case class Pass(endpoint: ActorRef) extends EndpointPolicy - case class Latched(timeOfFailure: Long) extends EndpointPolicy + case class Gated(timeOfFailure: Long) extends EndpointPolicy case class Quarantined(reason: Throwable) extends EndpointPolicy case object Prune @@ -202,10 +204,15 @@ private[remote] object EndpointManager { def passiveEndpointFor(address: Address): Option[ActorRef] = addressToPassive.get(address) + def isQuarantined(address: Address): Boolean = addressToEndpointAndPolicy.get(address) match { + case Some(Quarantined(_)) ⇒ true + case _ ⇒ false + } + def prune(pruneAge: Long): Unit = { addressToEndpointAndPolicy = addressToEndpointAndPolicy.filter { - case (_, Latched(timeOfFailure)) ⇒ timeOfFailure + pruneAge > System.nanoTime() - case _ ⇒ true + case (_, Gated(timeOfFailure)) ⇒ timeOfFailure + pruneAge > System.nanoTime() + case _ ⇒ true } } @@ -222,14 +229,14 @@ private[remote] object EndpointManager { } def markFailed(endpoint: ActorRef, timeOfFailure: Long): Unit = { - addressToEndpointAndPolicy += endpointToAddress(endpoint) -> Latched(timeOfFailure) + addressToEndpointAndPolicy += endpointToAddress(endpoint) -> Gated(timeOfFailure) endpointToAddress = endpointToAddress - endpoint } def markQuarantine(address: Address, reason: Throwable): Unit = addressToEndpointAndPolicy += address -> Quarantined(reason) - def removeIfNotLatched(endpoint: ActorRef): Unit = { + def removeIfNotGated(endpoint: ActorRef): Unit = { endpointToAddress.get(endpoint) foreach { address ⇒ addressToEndpointAndPolicy.get(address) foreach { policy ⇒ policy match { @@ -251,7 +258,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val settings = new RemotingSettings(conf) val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem] - var endpointId: Long = 0L + val endpointId: Iterator[Int] = Iterator from 0 val eventPublisher = new EventPublisher(context.system, log, settings.LogLifecycleEvents) @@ -261,9 +268,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends // Mapping between transports and the local addresses they listen to var transportMapping: Map[Address, Transport] = Map() - val retryLatchEnabled = settings.RetryLatchClosedFor > 0L - val pruneInterval: Long = if (retryLatchEnabled) settings.RetryLatchClosedFor * 2L else 0L - val pruneTimerCancellable: Option[Cancellable] = if (retryLatchEnabled) + val retryGateEnabled = settings.RetryGateClosedFor > 0L + val pruneInterval: Long = if (retryGateEnabled) settings.RetryGateClosedFor * 2L else 0L + val pruneTimerCancellable: Option[Cancellable] = if (retryGateEnabled) Some(context.system.scheduler.schedule(pruneInterval milliseconds, pruneInterval milliseconds, self, Prune)) else None @@ -273,13 +280,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Stop case NonFatal(e) ⇒ - if (!retryLatchEnabled) + if (!retryGateEnabled) // This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue // to the restarted endpoint -- thus no messages are lost Restart else { // This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure, - // keeps throwing away messages until the retry latch becomes open (time specified in RetryLatchClosedFor) + // keeps throwing away messages until the retry gate becomes open (time specified in RetryGateClosedFor) endpoints.markFailed(sender, System.nanoTime()) Stop } @@ -291,6 +298,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends addressesPromise.failure(e) context.stop(self) } + + case StartupFinished ⇒ context.become(accepting) } val accepting: Receive = { @@ -299,7 +308,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.getEndpointWithPolicy(recipientAddress) match { case Some(Pass(endpoint)) ⇒ endpoint ! s - case Some(Latched(timeOfFailure)) ⇒ if (retryLatchOpen(timeOfFailure)) { + case Some(Gated(timeOfFailure)) ⇒ if (retryGateOpen(timeOfFailure)) { val endpoint = createEndpoint(recipientAddress, recipientRef.localAddressToUse, None) endpoints.registerActiveEndpoint(recipientAddress, endpoint) endpoint ! s @@ -319,10 +328,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, true)) if (settings.UsePassiveConnections && !endpoints.hasActiveEndpointFor(handle.remoteAddress)) { endpoints.registerActiveEndpoint(handle.remoteAddress, endpoint) - } else endpoints.registerPassiveEndpoint(handle.remoteAddress, endpoint) + } else if (!endpoints.isQuarantined(handle.remoteAddress)) + endpoints.registerPassiveEndpoint(handle.remoteAddress, endpoint) + else handle.disassociate() } - case Terminated(endpoint) ⇒ endpoints.removeIfNotLatched(endpoint); - case Prune ⇒ endpoints.prune(settings.RetryLatchClosedFor) + case Terminated(endpoint) ⇒ endpoints.removeIfNotGated(endpoint); + case Prune ⇒ endpoints.prune(settings.RetryGateClosedFor) } private def initializeTransports(addressesPromise: Promise[Set[(Transport, Address)]]): Unit = { @@ -330,7 +341,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config) - val wrappedTransport = context.system.asInstanceOf[ActorSystemImpl].dynamicAccess + val wrappedTransport = extendedSystem.dynamicAccess .createInstanceFor[Transport](fqn, args).recover({ case exception ⇒ throw new IllegalArgumentException( @@ -345,16 +356,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } val listens: Future[Seq[(Transport, (Address, Promise[ActorRef]))]] = Future.sequence( - transports.map { transport ⇒ transport.listen.map { transport -> _ } }) + transports.map { transport ⇒ transport.listen map (transport -> _) }) listens.onComplete { case Success(results) ⇒ - transportMapping = HashMap() ++ results.groupBy { case (_, (transportAddress, _)) ⇒ transportAddress }.map { - case (a, t) ⇒ - if (t.size > 1) - throw new RemoteTransportException(s"There are more than one transports listening on local address $a", null) - - a -> t.head._1 + transportMapping = results.groupBy { + case (_, (transportAddress, _)) ⇒ transportAddress + } map { + case (a, t) if t.size > 1 ⇒ + throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]", null) + case (a, t) ⇒ a -> t.head._1 } val transportsAndAddresses = (for ((transport, (address, promise)) ← results) yield { @@ -363,8 +374,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends }).toSet addressesPromise.success(transportsAndAddresses) - context.become(accepting) - case Failure(reason) ⇒ addressesPromise.failure(reason) } } @@ -372,9 +381,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends private def createEndpoint(remoteAddress: Address, localAddress: Address, handleOption: Option[AssociationHandle]): ActorRef = { - assert(transportMapping.contains(localAddress)) - val id = endpointId - endpointId += 1L + assert(transportMapping contains (localAddress)) val endpoint = context.actorOf(Props( new EndpointWriter( @@ -385,21 +392,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends settings, AkkaPduProtobufCodec)) .withDispatcher("akka.remoting.writer-dispatcher"), - "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId) + "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next()) context.watch(endpoint) // TODO: see what to do with this } - private def retryLatchOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryLatchClosedFor) < System.nanoTime() + private def retryGateOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryGateClosedFor) < System.nanoTime() override def postStop(): Unit = { pruneTimerCancellable.foreach { _.cancel() } transportMapping.values foreach { transport ⇒ - try transport.shutdown() - catch { + try transport.shutdown() catch { case NonFatal(e) ⇒ - log.error(e, s"Unable to shut down the underlying Transport: $transport") + log.error(e, s"Unable to shut down the underlying Transport: [$transport]") } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala index 3990eca79d..40eb1a03a4 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala @@ -24,27 +24,32 @@ trait AssociationEvent extends RemotingLifecycleEvent { case class AssociatedEvent( localAddress: Address, remoteAddress: Address, - inbound: Boolean) extends AssociationEvent { + inbound: Boolean) + extends AssociationEvent { + protected override val eventName: String = "Associated" override def logLevel: Logging.LogLevel = Logging.DebugLevel + } case class DisassociatedEvent( localAddress: Address, remoteAddress: Address, - inbound: Boolean) extends AssociationEvent { + inbound: Boolean) + extends AssociationEvent { protected override val eventName: String = "Disassociated" override def logLevel: Logging.LogLevel = Logging.DebugLevel } case class AssociationErrorEvent( - @BeanProperty cause: Throwable, + cause: Throwable, localAddress: Address, remoteAddress: Address, inbound: Boolean) extends AssociationEvent { protected override val eventName: String = "AssociationError" override def logLevel: Logging.LogLevel = Logging.ErrorLevel override def toString: String = s"${super.toString}: Error[${Logging.stackTraceFor(cause)}]" + def getCause: Throwable = cause } case class RemotingListenEvent(listenAddresses: Set[Address]) extends RemotingLifecycleEvent { diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index 91459926dc..bbac7e9619 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -15,7 +15,7 @@ import com.typesafe.config.Config import akka.japi.Util._ import akka.ConfigurationException -private[akka] class SslSettings(config: Config) { +private[akka] class SSLSettings(config: Config) { import config._ val SSLKeyStore = Option(getString("key-store")).filter(_.length > 0) @@ -54,7 +54,7 @@ private[akka] object NettySSLSupport { /** * Construct a SSLHandler which can be inserted into a Netty server/client pipeline */ - def apply(settings: SslSettings, log: LoggingAdapter, isClient: Boolean): SslHandler = + def apply(settings: SSLSettings, log: LoggingAdapter, isClient: Boolean): SslHandler = if (isClient) initializeClientSSL(settings, log) else initializeServerSSL(settings, log) def initializeCustomSecureRandom(rngName: Option[String], sourceOfRandomness: Option[String], log: LoggingAdapter): SecureRandom = { @@ -88,10 +88,10 @@ private[akka] object NettySSLSupport { rng } - def initializeClientSSL(settings: SslSettings, log: LoggingAdapter): SslHandler = { + def initializeClientSSL(settings: SSLSettings, log: LoggingAdapter): SslHandler = { log.debug("Client SSL is enabled, initialising ...") - def constructClientContext(settings: SslSettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = + def constructClientContext(settings: SSLSettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = try { val rng = initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log) val trustManagers: Array[TrustManager] = { @@ -137,10 +137,10 @@ private[akka] object NettySSLSupport { } } - def initializeServerSSL(settings: SslSettings, log: LoggingAdapter): SslHandler = { + def initializeServerSSL(settings: SSLSettings, log: LoggingAdapter): SslHandler = { log.debug("Server SSL is enabled, initialising ...") - def constructServerContext(settings: SslSettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = + def constructServerContext(settings: SSLSettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = try { val rng = initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log) val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index fa7204c79a..951ff31e59 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -99,7 +99,7 @@ private[akka] class NettySettings(config: Config, val systemName: String) { val ClientSocketWorkerPoolSize = computeWPS(config.getConfig("client-socket-worker-pool")) - val SslSettings = new SslSettings(config.getConfig("ssl")) + val SslSettings = new SSLSettings(config.getConfig("ssl")) val EnableSSL = getBoolean("ssl.enable") } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index 39db000b6b..ded169dec2 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -14,7 +14,7 @@ class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg private[remote] object AkkaPduCodec { /** - * Trait that represents decoded Akka PDUs + * Trait that represents decoded Akka PDUs (Protocol Data Units) */ sealed trait AkkaPdu @@ -30,7 +30,7 @@ private[remote] object AkkaPduCodec { } /** - * A Codec that is able to convert Akka PDUs from and to [[akka.util.ByteString]]s. + * A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s. */ private[remote] trait AkkaPduCodec { @@ -68,11 +68,11 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { senderOption foreach { ref ⇒ messageBuilder.setSender(serializeActorRef(localAddress, ref)) } messageBuilder.setMessage(serializedMessage) - akkaMessageProtocolToByteString(messageBuilder.build) + ByteString(messageBuilder.build.toByteArray) } - override def constructPayload(payload: ByteString): ByteString = akkaRemoteProtocolToByteString( - AkkaRemoteProtocol.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build) + override def constructPayload(payload: ByteString): ByteString = + ByteString(AkkaRemoteProtocol.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build.toByteArray) override def constructAssociate(cookie: Option[String], origin: Address): ByteString = constructControlMessagePdu(RemoteProtocol.CommandType.CONNECT, cookie, Some(origin)) @@ -139,14 +139,9 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { for (originAddress ← origin; serialized ← serializeAddress(originAddress)) controlMessageBuilder.setOrigin(serialized) - akkaRemoteProtocolToByteString(AkkaRemoteProtocol.newBuilder().setInstruction(controlMessageBuilder.build).build) + ByteString(AkkaRemoteProtocol.newBuilder().setInstruction(controlMessageBuilder.build).build.toByteArray) } - private def akkaRemoteProtocolToByteString(pdu: AkkaRemoteProtocol): ByteString = ByteString(pdu.toByteArray) - - private def akkaMessageProtocolToByteString(message: RemoteMessageProtocol): ByteString = - ByteString(message.toByteArray) - private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefProtocol = { val fullActorRefString: String = if (ref.path.address.host.isDefined) ref.path.toString diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 5aadb4f4fd..c274dbffe4 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -126,13 +126,14 @@ private[remote] class AkkaProtocolTransport( statusPromise.future } - override def shutdown(): Unit = { - manager ! PoisonPill - } + override def shutdown(): Unit = manager ! PoisonPill + } -private[transport] class AkkaProtocolManager(private val wrappedTransport: Transport, - private val settings: AkkaProtocolSettings) extends Actor { +private[transport] class AkkaProtocolManager( + private val wrappedTransport: Transport, + private val settings: AkkaProtocolSettings) + extends Actor { import context.dispatcher @@ -142,7 +143,7 @@ private[transport] class AkkaProtocolManager(private val wrappedTransport: Trans case NonFatal(_) ⇒ Stop } - private var nextId = 0L + private val nextId = Iterator from 0 private val associationHandlerPromise: Promise[ActorRef] = Promise() associationHandlerPromise.future.map { HandlerRegistered(_) } pipeTo self @@ -176,10 +177,8 @@ private[transport] class AkkaProtocolManager(private val wrappedTransport: Trans case InboundAssociation(handle) ⇒ handle.disassociate() } - private def actorNameFor(remoteAddress: Address): String = { - nextId += 1 - "akkaProtocol-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + nextId - } + private def actorNameFor(remoteAddress: Address): String = + "akkaProtocol-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + nextId.next() private def ready: Receive = { case InboundAssociation(handle) ⇒ @@ -202,7 +201,7 @@ private[transport] class AkkaProtocolManager(private val wrappedTransport: Trans createFailureDetector())), actorNameFor(remoteAddress)) } - private def createFailureDetector(): PhiAccrualFailureDetector = new PhiAccrualFailureDetector( + private def createFailureDetector(): FailureDetector = new PhiAccrualFailureDetector( settings.FailureDetectorThreshold, settings.FailureDetectorMaxSampleSize, settings.FailureDetectorStdDeviation, @@ -232,8 +231,27 @@ private[transport] class AkkaProtocolHandle( private[transport] object ProtocolStateActor { sealed trait AssociationState + + /* + * State when the underlying transport is not yet initialized + * State data can be OutboundUnassociated + */ case object Closed extends AssociationState + + /* + * State when the underlying transport is initialized, there is an association present, and we are waiting + * for the first message. Outbound connections can skip this phase if WaitActivity configuration parameter + * is turned off. + * State data can be OutboundUnderlyingAssociated (for outbound associations) or InboundUnassociated (for inbound + * when upper layer is not notified yet) + */ case object WaitActivity extends AssociationState + + /* + * State when the underlying transport is initialized and the handshake succeeded. + * If the upper layer did not yet provided a handler for incoming messages, state data is AssociatedWaitHandler. + * If everything is initialized, the state data is HandlerReady + */ case object Open extends AssociationState case object HeartbeatTimer @@ -241,7 +259,7 @@ private[transport] object ProtocolStateActor { sealed trait ProtocolStateData trait InitialProtocolStateData extends ProtocolStateData - // Nor the underlying, nor the provided transport is associated + // Neither the underlying, nor the provided transport is associated case class OutboundUnassociated(remoteAddress: Address, statusPromise: Promise[Status], transport: Transport) extends InitialProtocolStateData @@ -301,7 +319,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case d: InboundUnassociated ⇒ d.wrappedHandle.readHandlerPromise.success(self) - startWith(Closed, d) + startWith(WaitActivity, d) } when(Closed) { @@ -321,12 +339,46 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat failureDetector.heartbeat() initTimers() - if (settings.WaitActivityEnabled) { + if (settings.WaitActivityEnabled) goto(WaitActivity) using OutboundUnderlyingAssociated(statusPromise, wrappedHandle) - } else { + else goto(Open) using AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) + + case Event(DisassociateUnderlying, _) ⇒ + stop() + + case _ ⇒ stay() + + } + + // Timeout of this state is implicitly handled by the failure detector + when(WaitActivity) { + case Event(Disassociated, _) ⇒ + stop() + + case Event(InboundPayload(p), OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) ⇒ + decodePdu(p) match { + case Disassociate ⇒ + stop() + + // Any other activity is considered an implicit acknowledgement of the association + case Payload(payload) ⇒ + sendHeartbeat(wrappedHandle) + goto(Open) using + AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue(payload)) + + case Heartbeat ⇒ + sendHeartbeat(wrappedHandle) + failureDetector.heartbeat() + goto(Open) using + AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) + + case _ ⇒ goto(Open) using + AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) } + case Event(HeartbeatTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) ⇒ handleTimers(wrappedHandle) + // Events for inbound associations case Event(InboundPayload(p), InboundUnassociated(associationHandler, wrappedHandle)) ⇒ decodePdu(p) match { @@ -352,71 +404,27 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } - case Event(DisassociateUnderlying, _) ⇒ - stop() - - case _ ⇒ stay() - - } - - // Timeout of this state is implicitly handled by the failure detector - when(WaitActivity) { - case Event(Disassociated, OutboundUnderlyingAssociated(_, _)) ⇒ - stop() - - case Event(InboundPayload(p), OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) ⇒ - decodePdu(p) match { - case Disassociate ⇒ - stop() - - // Any other activity is considered an implicit acknowledgement of the association - case Payload(payload) ⇒ - sendHeartbeat(wrappedHandle) - goto(Open) using - AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue(payload)) - - case Heartbeat ⇒ - sendHeartbeat(wrappedHandle) - failureDetector.heartbeat() - goto(Open) using - AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) - - case _ ⇒ goto(Open) using - AssociatedWaitHandler(notifyOutboundHandler(wrappedHandle, statusPromise), wrappedHandle, Queue.empty) - } - - case Event(HeartbeatTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) ⇒ handleTimers(wrappedHandle) - } when(Open) { case Event(Disassociated, _) ⇒ stop() - case Event(InboundPayload(p), AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒ + case Event(InboundPayload(p), _) ⇒ decodePdu(p) match { case Disassociate ⇒ stop() case Heartbeat ⇒ failureDetector.heartbeat(); stay() - case Payload(payload) ⇒ - // Queue message until handler is registered - stay() using AssociatedWaitHandler(handlerFuture, wrappedHandle, queue :+ payload) - - case _ ⇒ stay() - } - - case Event(InboundPayload(p), HandlerReady(handler, _)) ⇒ - decodePdu(p) match { - case Disassociate ⇒ - stop() - - case Heartbeat ⇒ failureDetector.heartbeat(); stay() - - case Payload(payload) ⇒ - handler ! InboundPayload(payload) - stay() + case Payload(payload) ⇒ stateData match { + case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue) ⇒ + // Queue message until handler is registered + stay() using AssociatedWaitHandler(handlerFuture, wrappedHandle, queue :+ payload) + case HandlerReady(handler, _) ⇒ + handler ! InboundPayload(payload) + stay() + } case _ ⇒ stay() } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 518be776cc..b981d5c09e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -3,7 +3,7 @@ package akka.remote.transport.netty import akka.ConfigurationException import akka.actor.{ Address, ExtendedActorSystem, ActorRef } import akka.event.Logging -import akka.remote.netty.{ SslSettings, NettySSLSupport, DefaultDisposableChannelGroup } +import akka.remote.netty.{ SSLSettings, NettySSLSupport, DefaultDisposableChannelGroup } import akka.remote.transport.Transport._ import akka.remote.transport.netty.NettyTransportSettings.{ Udp, Tcp, Mode } import akka.remote.transport.{ AssociationHandle, Transport } @@ -37,7 +37,7 @@ class NettyTransportSettings(config: Config) { val TransportMode: Mode = getString("transport-protocol") match { case "tcp" ⇒ Tcp case "udp" ⇒ Udp - case s @ _ ⇒ throw new ConfigurationException("Unknown transport: " + s) + case s @ _ ⇒ throw new ConfigurationException("Unknown transport specified in transport-protocol: " + s) } val EnableSsl: Boolean = if (getBoolean("enable-ssl") && TransportMode == Udp) @@ -76,7 +76,7 @@ class NettyTransportSettings(config: Config) { @deprecated("WARNING: This should only be used by professionals.", "2.0") val PortSelector: Int = getInt("port") - val SslSettings: Option[SslSettings] = if (EnableSsl) Some(new SslSettings(config.getConfig("ssl"))) else None + val SslSettings: Option[SSLSettings] = if (EnableSsl) Some(new SSLSettings(config.getConfig("ssl"))) else None val ServerSocketWorkerPoolSize: Int = computeWPS(config.getConfig("server-socket-worker-pool")) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 4bc4920e0c..5a006e0bd0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -37,7 +37,7 @@ trait TcpHandlers extends CommonHandlers with HasTransport { override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent) { trySend(e.getChannel, Disassociated) - e.getChannel.close() // No graceful close here -- force TCP reset + e.getChannel.close() // No graceful close here } } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 3bf8850339..e5beca0122 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -195,14 +195,13 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "support remote look-ups" in { here ! "ping" - expectMsgPF() { - case ("pong", s: AnyRef) if s eq testActor ⇒ true - } + expectMsg("pong") + lastSender must be(testActor) } "send error message for wrong address" in { EventFilter.error(start = "AssociationError", occurrences = 1).intercept { - system.actorFor("test.akka://remotesys@localhost:12346/user/echo") ! "ping" + system.actorFor("test.akka://nonexistingsystem@localhost:12346/user/echo") ! "ping" } } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 73de5041c5..77fe3e8e93 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -31,34 +31,34 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val conf = ConfigFactory.parseString( """ - | akka.remoting { - | - | failure-detector { - | threshold = 7.0 - | max-sample-size = 100 - | min-std-deviation = 100 ms - | acceptable-heartbeat-pause = 3 s - | } - | - | heartbeat-interval = 0.1 s - | - | wait-activity-enabled = on - | - | backoff-interval = 1 s - | - | require-cookie = off - | - | secure-cookie = "abcde" - | - | shutdown-timeout = 5 s - | - | startup-timeout = 5 s - | - | retry-latch-closed-for = 0 s - | - | use-passive-connections = on - | } - """.stripMargin) + akka.remoting { + + failure-detector { + threshold = 7.0 + max-sample-size = 100 + min-std-deviation = 100 ms + acceptable-heartbeat-pause = 3 s + } + + heartbeat-interval = 0.1 s + + wait-activity-enabled = on + + backoff-interval = 1 s + + require-cookie = off + + secure-cookie = "abcde" + + shutdown-timeout = 5 s + + startup-timeout = 5 s + + retry-latch-closed-for = 0 s + + use-passive-connections = on + } + """) val localAddress = Address("test", "testsystem", "testhost", 1234) val localAkkaAddress = Address("test.akka", "testsystem", "testhost", 1234) @@ -69,7 +69,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val codec = AkkaPduProtobufCodec val testMsg = RemoteProtocol.MessageProtocol.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build - val testEnvelope = codec.constructMessage(localAkkaAddress, self, testMsg, None) + val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, None) val testMsgPdu: ByteString = codec.constructPayload(testEnvelope) def testHeartbeat = InboundPayload(codec.constructHeartbeat) @@ -123,7 +123,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - self, + testActor, new AkkaProtocolSettings(conf), codec, failureDetector))) @@ -137,7 +137,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - self, + testActor, new AkkaProtocolSettings(conf), codec, failureDetector))) @@ -150,7 +150,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case InboundAssociation(h) ⇒ h } - wrappedHandle.readHandlerPromise.success(self) + wrappedHandle.readHandlerPromise.success(testActor) failureDetector.called must be(true) @@ -170,7 +170,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - self, + testActor, new AkkaProtocolSettings(conf), codec, failureDetector))) @@ -257,7 +257,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - self, + testActor, new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)), codec, failureDetector))) @@ -276,7 +276,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - self, + testActor, new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)), codec, failureDetector))) @@ -288,7 +288,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case InboundAssociation(h) ⇒ h } - wrappedHandle.readHandlerPromise.success(self) + wrappedHandle.readHandlerPromise.success(testActor) failureDetector.called must be(true) @@ -309,9 +309,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re transport, new AkkaProtocolSettings(ConfigFactory.parseString( """ - | akka.remoting.require-cookie = on - | akka.remoting.wait-activity-enabled = off - """.stripMargin).withFallback(conf)), + akka.remoting.require-cookie = on + akka.remoting.wait-activity-enabled = off + """).withFallback(conf)), codec, failureDetector))) @@ -350,7 +350,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case _ ⇒ fail() } - wrappedHandle.readHandlerPromise.success(self) + wrappedHandle.readHandlerPromise.success(testActor) lastActivityIsAssociate(registry, None) must be(true) @@ -388,7 +388,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case _ ⇒ fail() } - wrappedHandle.readHandlerPromise.success(self) + wrappedHandle.readHandlerPromise.success(testActor) Thread.sleep(100) @@ -421,7 +421,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case _ ⇒ fail() } - wrappedHandle.readHandlerPromise.success(self) + wrappedHandle.readHandlerPromise.success(testActor) lastActivityIsAssociate(registry, None) must be(true) @@ -459,7 +459,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re stateActor ! Disassociated - wrappedHandle.readHandlerPromise.success(self) + wrappedHandle.readHandlerPromise.success(testActor) expectMsg(Disassociated)