diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 7b83673c78..b44f2ba063 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -13,6 +13,7 @@ import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle } import akka.serialization.Serialization import akka.util.ByteString import util.control.{ NoStackTrace, NonFatal } +import akka.remote.transport.Transport.InvalidAssociationException /** * Internal API @@ -164,14 +165,14 @@ private[remote] class EndpointWriter( case Event(Send(msg, senderOption, recipient), _) ⇒ stash() stay() - case Event(Transport.Invalid(e), _) ⇒ + case Event(Status.Failure(e: InvalidAssociationException), _) ⇒ log.error(e, "Tried to associate with invalid remote address [{}]. " + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) - case Event(Transport.Fail(e), _) ⇒ + case Event(Status.Failure(e), _) ⇒ publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e)) - case Event(Transport.Ready(inboundHandle), _) ⇒ + case Event(inboundHandle: AssociationHandle, _) ⇒ handle = Some(inboundHandle) startReadEndpoint() goto(Writing) diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index f353d2889d..b46f89262b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -62,7 +62,7 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor protected def interceptListen(listenAddress: Address, listenerFuture: Future[AssociationEventListener]): Future[AssociationEventListener] - protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit + protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit override def schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier) @@ -71,22 +71,17 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor override def maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - maximumOverhead override def listen: Future[(Address, Promise[AssociationEventListener])] = { - val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise() val upstreamListenerPromise: Promise[AssociationEventListener] = Promise() - wrappedTransport.listen.onComplete { + wrappedTransport.listen andThen { case Success((listenAddress, listenerPromise)) ⇒ // Register to downstream listenerPromise.tryCompleteWith(interceptListen(listenAddress, upstreamListenerPromise.future)) - // Notify upstream - listenPromise.success((augmentScheme(listenAddress), upstreamListenerPromise)) - case Failure(reason) ⇒ listenPromise.failure(reason) - } - listenPromise.future + } map { case (listenAddress, _) ⇒ (augmentScheme(listenAddress), upstreamListenerPromise) } } - override def associate(remoteAddress: Address): Future[Status] = { + override def associate(remoteAddress: Address): Future[AssociationHandle] = { // Prepare a future, and pass its promise to the manager - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() interceptAssociate(removeScheme(remoteAddress), statusPromise) @@ -118,7 +113,7 @@ object ActorTransportAdapter { sealed trait TransportOperation case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation - case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[Status]) extends TransportOperation + case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation case class ListenUnderlying(listenAddress: Address, upstreamListener: Future[AssociationEventListener]) extends TransportOperation case object DisassociateUnderlying extends TransportOperation @@ -149,7 +144,7 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS } } - override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit = + override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit = manager ! AssociateUnderlying(remoteAddress, statusPromise) override def shutdown(): Unit = manager ! PoisonPill diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index d9176c9f7b..64a0e033a0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -19,7 +19,9 @@ import scala.util.{ Success, Failure } import scala.collection.immutable import akka.remote.transport.ActorTransportAdapter._ -class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace +class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace { + def this(msg: String) = this(msg, null) +} private[remote] class AkkaProtocolSettings(config: Config) { @@ -221,11 +223,11 @@ private[transport] object ProtocolStateActor { trait InitialProtocolStateData extends ProtocolStateData // Neither the underlying, nor the provided transport is associated - case class OutboundUnassociated(remoteAddress: Address, statusPromise: Promise[Status], transport: Transport) + case class OutboundUnassociated(remoteAddress: Address, statusPromise: Promise[AssociationHandle], transport: Transport) extends InitialProtocolStateData // The underlying transport is associated, but the handshake of the akka protocol is not yet finished - case class OutboundUnderlyingAssociated(statusPromise: Promise[Status], wrappedHandle: AssociationHandle) + case class OutboundUnderlyingAssociated(statusPromise: Promise[AssociationHandle], wrappedHandle: AssociationHandle) extends ProtocolStateData // The underlying transport is associated, but the handshake of the akka protocol is not yet finished @@ -256,7 +258,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat // Outbound case def this(localAddress: Address, remoteAddress: Address, - statusPromise: Promise[Status], + statusPromise: Promise[AssociationHandle], transport: Transport, settings: AkkaProtocolSettings, codec: AkkaPduCodec, @@ -287,15 +289,11 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat when(Closed) { // Transport layer events for outbound associations - case Event(s @ Invalid(_), OutboundUnassociated(_, statusPromise, _)) ⇒ - statusPromise.success(s) + case Event(Status.Failure(e), OutboundUnassociated(_, statusPromise, _)) ⇒ + statusPromise.failure(e) stop() - case Event(s @ Fail(_), OutboundUnassociated(_, statusPromise, _)) ⇒ - statusPromise.success(s) - stop() - - case Event(Ready(wrappedHandle), OutboundUnassociated(_, statusPromise, _)) ⇒ + case Event(wrappedHandle: AssociationHandle, OutboundUnassociated(_, statusPromise, _)) ⇒ wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self)) sendAssociate(wrappedHandle) failureDetector.heartbeat() @@ -431,14 +429,14 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat onTermination { case StopEvent(_, _, OutboundUnassociated(remoteAddress, statusPromise, transport)) ⇒ - statusPromise.trySuccess(Fail(new AkkaProtocolException("Transport disassociated before handshake finished", null))) + statusPromise.tryFailure(new AkkaProtocolException("Transport disassociated before handshake finished", null)) case StopEvent(reason, _, OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) ⇒ val msg = reason match { case FSM.Failure(TimeoutReason) ⇒ "No response from remote. Handshake timed out." case _ ⇒ "Remote endpoint disassociated before handshake finished" } - statusPromise.trySuccess(Fail(new AkkaProtocolException(msg, null))) + statusPromise.tryFailure(new AkkaProtocolException(msg, null)) wrappedHandle.disassociate() case StopEvent(_, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒ @@ -458,7 +456,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self private def notifyOutboundHandler(wrappedHandle: AssociationHandle, - statusPromise: Promise[Status]): Future[HandleEventListener] = { + statusPromise: Promise[AssociationHandle]): Future[HandleEventListener] = { val readHandlerPromise: Promise[HandleEventListener] = Promise() listenForListenerRegistration(readHandlerPromise) @@ -471,7 +469,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat self, codec) - statusPromise.success(Ready(exposedHandle)) + statusPromise.success(exposedHandle) readHandlerPromise.future } diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index c122745536..59be194dde 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -71,18 +71,14 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor Future.successful(this) } - protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit = { + protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit = { // Association is simulated to be failed if there was either an inbound or outbound message drop if (shouldDropInbound(remoteAddress) || shouldDropOutbound(remoteAddress)) - statusPromise.success(Fail(new FailureInjectorException("Simulated failure of association to " + remoteAddress))) + statusPromise.failure(new FailureInjectorException("Simulated failure of association to " + remoteAddress)) else - statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map { - _ match { - case Ready(handle) ⇒ - addressChaosTable.putIfAbsent(handle.remoteAddress.copy(protocol = "", system = ""), PassThru) - Ready(new FailureInjectorHandle(handle, this)) - case s: Status ⇒ s - } + statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map { handle ⇒ + addressChaosTable.putIfAbsent(handle.remoteAddress.copy(protocol = "", system = ""), PassThru) + new FailureInjectorHandle(handle, this) }) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index 02d9f08433..200e038778 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -44,10 +44,10 @@ class TestTransport( associationListenerPromise.future.onSuccess { case listener: AssociationEventListener ⇒ registry.registerTransport(this, listener) } - Promise.successful((localAddress, associationListenerPromise)).future + Future.successful((localAddress, associationListenerPromise)) } - private def defaultAssociate(remoteAddress: Address): Future[Status] = { + private def defaultAssociate(remoteAddress: Address): Future[AssociationHandle] = { registry.transportFor(remoteAddress) match { case Some((remoteTransport, listener)) ⇒ @@ -61,10 +61,10 @@ class TestTransport( registry.registerListenerPair(localHandle.key, bothSides) listener notify InboundAssociation(remoteHandle) - Promise.successful(Ready(localHandle)).future + Future.successful(localHandle) case None ⇒ - Promise.successful(Fail(new IllegalArgumentException(s"No registered transport: $remoteAddress"))).future + Future.failed(new IllegalArgumentException(s"No registered transport: $remoteAddress")) } } @@ -75,7 +75,7 @@ class TestTransport( (localHandle, remoteHandle) } - private def defaultShutdown: Future[Unit] = Promise.successful(()).future + private def defaultShutdown: Future[Unit] = Future.successful(()) /** * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method. @@ -87,7 +87,7 @@ class TestTransport( /** * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method. */ - val associateBehavior = new SwitchableLoggedBehavior[Address, Status]( + val associateBehavior = new SwitchableLoggedBehavior[Address, AssociationHandle]( defaultAssociate _, (remoteAddress) ⇒ registry.logActivity(AssociateAttempt(localAddress, remoteAddress))) @@ -99,7 +99,7 @@ class TestTransport( (_) ⇒ registry.logActivity(ShutdownAttempt(localAddress))) override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior() - override def associate(remoteAddress: Address): Future[Status] = associateBehavior(remoteAddress) + override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress) override def shutdown(): Unit = shutdownBehavior() private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = { @@ -111,7 +111,7 @@ class TestTransport( } writePromise.future case None ⇒ - Promise.failed(new IllegalStateException("No association present")).future + Future.failed(new IllegalStateException("No association present")) } } @@ -123,7 +123,7 @@ class TestTransport( } } - Promise.successful(()).future + Future.successful(()) } /** @@ -210,7 +210,7 @@ object TestTransport { * The constant the future will be completed with. */ def pushConstant(c: B): Unit = push { - (x) ⇒ Promise.successful(c).future + (x) ⇒ Future.successful(c) } /** @@ -220,7 +220,7 @@ object TestTransport { * The throwable the failed future will contain. */ def pushError(e: Throwable): Unit = push { - (x) ⇒ Promise.failed(e).future + (x) ⇒ Future.failed(e) } /** @@ -332,8 +332,8 @@ object TestTransport { * Indicates if all given transports were successfully registered. No associations can be established between * transports that are not yet registered. * - * @param transports - * The transports that participate in the test case. + * @param addresses + * The listen addresses of transports that participate in the test case. * @return * True if all transports are successfully registered. */ diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 5164d6969f..c007b3d7d2 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -19,7 +19,7 @@ import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.Promise import scala.math.min -import scala.util.Success +import scala.util.{ Success, Failure } import scala.util.control.NonFatal import scala.concurrent.duration._ @@ -151,16 +151,20 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A val wrappedHandle = wrapHandle(handle, associationListener, inbound = true) wrappedHandle.throttlerActor ! wrappedHandle case AssociateUnderlying(remoteAddress, statusPromise) ⇒ - wrappedTransport.associate(remoteAddress).onComplete { - case Success(Ready(handle)) ⇒ - val wrappedHandle = wrapHandle(handle, associationListener, inbound = false) - val inMode = getInboundMode(nakedAddress(remoteAddress)) - wrappedHandle.outboundThrottleMode.set(getOutboundMode(nakedAddress(remoteAddress))) - wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor - handleTable ::= nakedAddress(remoteAddress) -> wrappedHandle - statusPromise.success(Ready(wrappedHandle)) - case s @ _ ⇒ statusPromise.complete(s) + wrappedTransport.associate(remoteAddress) onComplete { + // Slight modification of pipe, only success is sent, failure is propagated to a separate future + case Success(handle) ⇒ self ! (handle, statusPromise) + case Failure(e) ⇒ statusPromise.failure(e) } + // Finished outbound association and got back the handle + case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒ + val wrappedHandle = wrapHandle(handle, associationListener, inbound = false) + val naked = nakedAddress(handle.remoteAddress) + val inMode = getInboundMode(naked) + wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked)) + wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor + handleTable ::= nakedAddress(naked) -> wrappedHandle + statusPromise.success(wrappedHandle) case s @ SetThrottle(address, direction, mode) ⇒ val naked = nakedAddress(address) throttlingModes += naked -> (mode, direction) diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 6e3be7aa5d..b2cdbcc54e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -4,39 +4,17 @@ import scala.concurrent.{ Promise, Future } import akka.actor.{ ActorRef, Address } import akka.util.ByteString import akka.remote.transport.AssociationHandle.HandleEventListener +import akka.AkkaException object Transport { trait AssociationEvent - /** - * Represents fine grained status of an association attempt. - */ - sealed trait Status extends AssociationEvent - /** * Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address, * hostname, etc.). */ - case class Invalid(cause: Throwable) extends Status - - /** - * The association setup has failed, but it is not known that a recovery is possible or not. Generally it means - * that the transport gave up its attempts to associate, but a retry might be successful at a later time. - * - * @param cause Cause of the failure - */ - case class Fail(cause: Throwable) extends Status - - /** - * No detectable errors happened during association. Generally a status of Ready does not guarantee that the - * association was successful. For example in the case of UDP, the transport MAY return Ready immediately after an - * association setup was requested. - * - * @param association - * The handle for the created association. - */ - case class Ready(association: AssociationHandle) extends Status + case class InvalidAssociationException(msg: String, cause: Throwable) extends AkkaException(msg, cause) /** * Message sent to a [[akka.remote.transport.Transport.AssociationEventListener]] registered to a transport @@ -125,15 +103,16 @@ trait Transport { * real transport-layer connection (TCP), more lightweight connections provided over datagram protocols (UDP with * additional services), substreams of multiplexed connections (SCTP) or physical links (serial port). * - * This call returns a fine-grained status indication of the attempt wrapped in a Future. See - * [[akka.remote.transport.Transport.Status]] for details. + * This call returns a future of an [[akka.remote.transport.AssociationHandle]]. A failed future indicates that + * the association attempt was unsuccessful. If the exception is [[akka.remote.transport.Transport.InvalidAssociationException]] + * then the association request was invalid, and it is impossible to recover. * * @param remoteAddress * The address of the remote transport entity. * @return * A status instance representing failure or a success containing an [[akka.remote.transport.AssociationHandle]] */ - def associate(remoteAddress: Address): Future[Status] + def associate(remoteAddress: Address): Future[AssociationHandle] /** * Shuts down the transport layer and releases all the corresponding resources. Shutdown is asynchronous, may be diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 6a7ed190a6..1106c75b21 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -30,7 +30,9 @@ object NettyTransportSettings { case object Udp extends Mode { override def toString = "udp" } } -class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) +class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) { + def this(msg: String) = this(msg, null) +} class NettyTransportSettings(config: Config) { @@ -137,12 +139,12 @@ abstract class ServerHandler(protected final val transport: NettyTransport, } abstract class ClientHandler(protected final val transport: NettyTransport, - private final val statusPromise: Promise[Status]) + private final val statusPromise: Promise[AssociationHandle]) extends NettyClientHelpers with CommonHandlers { final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = { channel.setReadable(false) - init(channel, remoteSocketAddress, msg) { handle ⇒ statusPromise.success(Ready(handle)) } + init(channel, remoteSocketAddress, msg)(statusPromise.success) } } @@ -227,7 +229,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s } } - private def clientPipelineFactory(statusPromise: Promise[Status]): ChannelPipelineFactory = new ChannelPipelineFactory { + private def clientPipelineFactory(statusPromise: Promise[AssociationHandle]): ChannelPipelineFactory = new ChannelPipelineFactory { override def getPipeline: ChannelPipeline = { val pipeline = newPipeline if (EnableSsl) pipeline.addFirst("SslHandler", NettySSLSupport(settings.SslSettings.get, log, true)) @@ -258,7 +260,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s case Udp ⇒ setupBootstrap(new ConnectionlessBootstrap(serverChannelFactory), serverPipelineFactory) } - private def outboundBootstrap(statusPromise: Promise[Status]): ClientBootstrap = { + private def outboundBootstrap(statusPromise: Promise[AssociationHandle]): ClientBootstrap = { val bootstrap = setupBootstrap(new ClientBootstrap(clientChannelFactory), clientPipelineFactory(statusPromise)) bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) bootstrap @@ -307,7 +309,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s case None ⇒ listenPromise.failure( - new NettyTransportException(s"Unknown local address type ${serverChannel.getLocalAddress.getClass}", null)) + new NettyTransportException(s"Unknown local address type ${serverChannel.getLocalAddress.getClass}")) } } catch { @@ -317,58 +319,61 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s listenPromise.future } - override def associate(remoteAddress: Address): Future[Status] = { - val statusPromise: Promise[Status] = Promise() + override def associate(remoteAddress: Address): Future[AssociationHandle] = { + val statusPromise: Promise[AssociationHandle] = Promise() - if (!serverChannel.isBound) statusPromise.success(Fail(new NettyTransportException("Transport is not bound", null))) + if (!serverChannel.isBound) statusPromise.failure(new NettyTransportException("Transport is not bound")) + else { - try { - if (!isDatagram) { - val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress)) + try { + if (!isDatagram) { + val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress)) - connectFuture.addListener(new ChannelFutureListener { - override def operationComplete(future: ChannelFuture) { - if (!future.isSuccess) - statusPromise.failure(future.getCause) - else if (future.isCancelled) - statusPromise.failure(new NettyTransportException("Connection was cancelled", null)) + connectFuture.addListener(new ChannelFutureListener { + override def operationComplete(future: ChannelFuture) { + if (!future.isSuccess) + statusPromise.failure(future.getCause) + else if (future.isCancelled) + statusPromise.failure(new NettyTransportException("Connection was cancelled")) - } - }) + } + }) - } else { - val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress)) + } else { + val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress)) - connectFuture.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (!future.isSuccess) - statusPromise.failure(future.getCause) - else if (future.isCancelled) - statusPromise.failure(new NettyTransportException("Connection was cancelled", null)) - else { - val handle: UdpAssociationHandle = new UdpAssociationHandle(localAddress, remoteAddress, future.getChannel, NettyTransport.this) + connectFuture.addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (!future.isSuccess) + statusPromise.failure(future.getCause) + else if (future.isCancelled) + statusPromise.failure(new NettyTransportException("Connection was cancelled")) + else { + val handle: UdpAssociationHandle = + new UdpAssociationHandle(localAddress, remoteAddress, future.getChannel, NettyTransport.this) - future.getChannel.getRemoteAddress match { - case addr: InetSocketAddress ⇒ - statusPromise.success(Ready(handle)) - handle.readHandlerPromise.future.onSuccess { - case listener: HandleEventListener ⇒ udpConnectionTable.put(addr, listener) - } - case a @ _ ⇒ statusPromise.success(Fail( - new NettyTransportException("Unknown remote address type " + a.getClass, null))) + future.getChannel.getRemoteAddress match { + case addr: InetSocketAddress ⇒ + statusPromise.success(handle) + handle.readHandlerPromise.future.onSuccess { + case listener: HandleEventListener ⇒ udpConnectionTable.put(addr, listener) + } + case a ⇒ statusPromise.failure( + new NettyTransportException("Unknown remote address type " + a.getClass)) + } } } - } - }) + }) + } + + } catch { + + case e @ (_: UnknownHostException | _: SecurityException | _: IllegalArgumentException) ⇒ + statusPromise.failure(InvalidAssociationException("Invalid association ", e)) + + case NonFatal(e) ⇒ + statusPromise.failure(e) } - - } catch { - - case e @ (_: UnknownHostException | _: SecurityException | _: IllegalArgumentException) ⇒ - statusPromise.success(Invalid(e)) - - case NonFatal(e) ⇒ - statusPromise.success(Fail(e)) } statusPromise.future diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 6911806e01..6e8b714fb0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -3,7 +3,7 @@ package akka.remote.transport.netty import akka.actor.Address import akka.remote.transport.AssociationHandle import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener, Disassociated, InboundPayload } -import akka.remote.transport.Transport.{ AssociationEventListener, Status } +import akka.remote.transport.Transport.AssociationEventListener import akka.util.ByteString import java.net.InetSocketAddress import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer } @@ -50,7 +50,7 @@ private[remote] class TcpServerHandler(_transport: NettyTransport, _associationL } -private[remote] class TcpClientHandler(_transport: NettyTransport, _statusPromise: Promise[Status]) +private[remote] class TcpClientHandler(_transport: NettyTransport, _statusPromise: Promise[AssociationHandle]) extends ClientHandler(_transport, _statusPromise) with TcpHandlers { override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent) { diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index c949990c5c..2b7af65c98 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -3,7 +3,7 @@ package akka.remote.transport.netty import akka.actor.Address import akka.remote.transport.AssociationHandle import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload } -import akka.remote.transport.Transport.{ AssociationEventListener, Status } +import akka.remote.transport.Transport.AssociationEventListener import akka.util.ByteString import java.net.{ SocketAddress, InetAddress, InetSocketAddress } import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } @@ -49,7 +49,7 @@ private[remote] class UdpServerHandler(_transport: NettyTransport, _associationL initInbound(channel, remoteSocketAddress, msg) } -private[remote] class UdpClientHandler(_transport: NettyTransport, _statusPromise: Promise[Status]) +private[remote] class UdpClientHandler(_transport: NettyTransport, _statusPromise: Promise[AssociationHandle]) extends ClientHandler(_transport, _statusPromise) with UdpHandlers { override def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = diff --git a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala index 821ae71d3c..03b8b5a921 100644 --- a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala @@ -37,6 +37,10 @@ akka.loglevel = DEBUG val target1 = other.actorFor(RootActorPath(addr) / "remote") val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements) + override def atTermination() { + other.shutdown() + } + // need to enable debug log-level without actually printing those messages system.eventStream.publish(TestEvent.Mute(EventFilter.debug())) diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index ce630c4ac7..d3a3125c3b 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -189,9 +189,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re "serve the handle as soon as possible if WaitActivity is turned off" in { val (failureDetector, registry, transport, handle) = collaborators - transport.associateBehavior.pushConstant(Transport.Ready(handle)) + transport.associateBehavior.pushConstant(handle) - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() system.actorOf(Props(new ProtocolStateActor( localAddress, @@ -203,7 +203,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re failureDetector))) Await.result(statusPromise.future, 3 seconds) match { - case Transport.Ready(h) ⇒ + case h: AssociationHandle ⇒ h.remoteAddress must be === remoteAkkaAddress h.localAddress must be === localAkkaAddress @@ -217,9 +217,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re "in outbound mode with WaitActivity delay readiness until activity detected" in { val (failureDetector, registry, transport, handle) = collaborators - transport.associateBehavior.pushConstant(Transport.Ready(handle)) + transport.associateBehavior.pushConstant(handle) - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, @@ -242,7 +242,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re reader ! testPayload Await.result(statusPromise.future, 3 seconds) match { - case Transport.Ready(h) ⇒ + case h: AssociationHandle ⇒ h.remoteAddress must be === remoteAkkaAddress h.localAddress must be === localAkkaAddress @@ -298,9 +298,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re "send cookie in Associate PDU if configured to do so" in { val (failureDetector, registry, transport, handle) = collaborators - transport.associateBehavior.pushConstant(Transport.Ready(handle)) + transport.associateBehavior.pushConstant(handle) - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() system.actorOf(Props(new ProtocolStateActor( localAddress, @@ -316,7 +316,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re failureDetector))) Await.result(statusPromise.future, 3 seconds) match { - case Transport.Ready(h) ⇒ + case h: AssociationHandle ⇒ h.remoteAddress must be === remoteAkkaAddress h.localAddress must be === localAkkaAddress @@ -328,9 +328,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re "handle explicit disassociate messages" in { val (failureDetector, registry, transport, handle) = collaborators - transport.associateBehavior.pushConstant(Transport.Ready(handle)) + transport.associateBehavior.pushConstant(handle) - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, @@ -342,7 +342,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re failureDetector))) val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match { - case Transport.Ready(h) ⇒ + case h: AssociationHandle ⇒ h.remoteAddress must be === remoteAkkaAddress h.localAddress must be === localAkkaAddress h @@ -361,9 +361,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re "handle transport level disassociations" in { val (failureDetector, registry, transport, handle) = collaborators - transport.associateBehavior.pushConstant(Transport.Ready(handle)) + transport.associateBehavior.pushConstant(handle) - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, @@ -380,7 +380,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re reader ! testHeartbeat val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match { - case Transport.Ready(h) ⇒ + case h: AssociationHandle ⇒ h.remoteAddress must be === remoteAkkaAddress h.localAddress must be === localAkkaAddress h @@ -399,9 +399,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re "disassociate when failure detector signals failure" in { val (failureDetector, registry, transport, handle) = collaborators - transport.associateBehavior.pushConstant(Transport.Ready(handle)) + transport.associateBehavior.pushConstant(handle) - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() system.actorOf(Props(new ProtocolStateActor( localAddress, @@ -413,7 +413,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re failureDetector))) val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match { - case Transport.Ready(h) ⇒ + case h: AssociationHandle ⇒ h.remoteAddress must be === remoteAkkaAddress h.localAddress must be === localAkkaAddress h @@ -435,9 +435,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re "handle correctly when the handler is registered only after the association is already closed" in { val (failureDetector, _, transport, handle) = collaborators - transport.associateBehavior.pushConstant(Transport.Ready(handle)) + transport.associateBehavior.pushConstant(handle) - val statusPromise: Promise[Status] = Promise() + val statusPromise: Promise[AssociationHandle] = Promise() val stateActor = system.actorOf(Props(new ProtocolStateActor( localAddress, @@ -449,7 +449,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re failureDetector))) val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match { - case Transport.Ready(h) ⇒ + case h: AssociationHandle ⇒ h.remoteAddress must be === remoteAkkaAddress h.localAddress must be === localAkkaAddress h diff --git a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala index 7974584f2d..4a33317ef3 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala @@ -8,13 +8,7 @@ import akka.testkit.{ ImplicitSender, DefaultTimeout, AkkaSpec } import akka.util.ByteString import scala.concurrent.{ Future, Await } import akka.remote.RemoteActorRefProvider -import akka.remote.transport.Transport.InboundAssociation -import akka.remote.transport.TestTransport.DisassociateAttempt -import akka.remote.transport.TestTransport.WriteAttempt -import akka.remote.transport.TestTransport.ListenAttempt -import akka.remote.transport.Transport.Fail -import akka.remote.transport.TestTransport.AssociateAttempt -import akka.remote.transport.Transport.Ready +import akka.remote.transport.TestTransport.{ DisassociateAttempt, WriteAttempt, ListenAttempt, AssociateAttempt } abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """) @@ -85,10 +79,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressATest)) - Await.result(transportA.associate(nonExistingAddress), timeout.duration) match { - case Fail(_) ⇒ - case _ ⇒ fail() - } + // TestTransport throws IllegalArgumentException when trying to associate with non-existing system + intercept[IllegalArgumentException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) } } "successfully send PDUs" in { @@ -101,12 +93,12 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) awaitCond(registry.transportsReady(addressATest, addressBTest)) - val associate: Future[Status] = transportA.associate(addressB) + val associate: Future[AssociationHandle] = transportA.associate(addressB) val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle } - val Ready(handleA) = Await.result(associate, timeout.duration) + val handleA = Await.result(associate, timeout.duration) // Initialize handles handleA.readHandlerPromise.success(ActorHandleEventListener(self)) @@ -139,12 +131,12 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) awaitCond(registry.transportsReady(addressATest, addressBTest)) - val associate: Future[Status] = transportA.associate(addressB) + val associate: Future[AssociationHandle] = transportA.associate(addressB) val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle } - val Ready(handleA) = Await.result(associate, timeout.duration) + val handleA = Await.result(associate, timeout.duration) // Initialize handles handleA.readHandlerPromise.success(ActorHandleEventListener(self)) diff --git a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala index 4c4381e023..e253247616 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala @@ -55,10 +55,10 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender var transportA = new TestTransport(addressA, registry) Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) - Await.result(transportA.associate(nonExistingAddress), timeout.duration) match { - case Fail(_) ⇒ - case _ ⇒ fail() - } + + // TestTransport throws IllegalArgumentException when trying to associate with non-existing system + intercept[IllegalArgumentException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) } + } "emulate sending PDUs and logs write" in { @@ -71,12 +71,12 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender awaitCond(registry.transportsReady(addressA, addressB)) - val associate: Future[Status] = transportA.associate(addressB) + val associate: Future[AssociationHandle] = transportA.associate(addressB) val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle } - val Ready(handleA) = Await.result(associate, timeout.duration) + val handleA = Await.result(associate, timeout.duration) // Initialize handles handleA.readHandlerPromise.success(ActorHandleEventListener(self)) @@ -108,12 +108,12 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender awaitCond(registry.transportsReady(addressA, addressB)) - val associate: Future[Status] = transportA.associate(addressB) + val associate: Future[AssociationHandle] = transportA.associate(addressB) val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") { case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle } - val Ready(handleA) = Await.result(associate, timeout.duration) + val handleA = Await.result(associate, timeout.duration) // Initialize handles handleA.readHandlerPromise.success(ActorHandleEventListener(self)) diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index 4ab7b76d2a..d05dc073cb 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -5,11 +5,7 @@ import akka.actor._ import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec } import ThrottlerTransportAdapterSpec._ import scala.concurrent.duration._ -import akka.remote.transport.TestTransport.{ WriteAttempt, AssociationRegistry } -import scala.concurrent.{ Promise, Future, Await } -import akka.remote.transport.Transport.{ Ready, InboundAssociation, Status } -import akka.util.ByteString -import akka.remote.transport.AssociationHandle.InboundPayload +import scala.concurrent.Await import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle } import akka.remote.RemoteActorRefProvider