Fixes in NettyTransport, ThrottlerTransportAdapter, removal of Status #2844
- Promise was completed twice erroneously - Purged the trait Status from Transport, using failed futures instead - using intercept instead of try ... catch in tests - added termination hook for UntrustedSpec to shut down second system - Fixed threading issues in ThrottlerTransportAdapter - Removed nulls from exception constructors - replaced Promise.successful(...).future with Future.successful
This commit is contained in:
parent
0585651794
commit
af5e756508
15 changed files with 160 additions and 190 deletions
|
|
@ -13,6 +13,7 @@ import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle }
|
||||||
import akka.serialization.Serialization
|
import akka.serialization.Serialization
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import util.control.{ NoStackTrace, NonFatal }
|
import util.control.{ NoStackTrace, NonFatal }
|
||||||
|
import akka.remote.transport.Transport.InvalidAssociationException
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal API
|
* Internal API
|
||||||
|
|
@ -164,14 +165,14 @@ private[remote] class EndpointWriter(
|
||||||
case Event(Send(msg, senderOption, recipient), _) ⇒
|
case Event(Send(msg, senderOption, recipient), _) ⇒
|
||||||
stash()
|
stash()
|
||||||
stay()
|
stay()
|
||||||
case Event(Transport.Invalid(e), _) ⇒
|
case Event(Status.Failure(e: InvalidAssociationException), _) ⇒
|
||||||
log.error(e, "Tried to associate with invalid remote address [{}]. " +
|
log.error(e, "Tried to associate with invalid remote address [{}]. " +
|
||||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||||
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
||||||
|
|
||||||
case Event(Transport.Fail(e), _) ⇒
|
case Event(Status.Failure(e), _) ⇒
|
||||||
publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e))
|
publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e))
|
||||||
case Event(Transport.Ready(inboundHandle), _) ⇒
|
case Event(inboundHandle: AssociationHandle, _) ⇒
|
||||||
handle = Some(inboundHandle)
|
handle = Some(inboundHandle)
|
||||||
startReadEndpoint()
|
startReadEndpoint()
|
||||||
goto(Writing)
|
goto(Writing)
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor
|
||||||
protected def interceptListen(listenAddress: Address,
|
protected def interceptListen(listenAddress: Address,
|
||||||
listenerFuture: Future[AssociationEventListener]): Future[AssociationEventListener]
|
listenerFuture: Future[AssociationEventListener]): Future[AssociationEventListener]
|
||||||
|
|
||||||
protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit
|
protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit
|
||||||
|
|
||||||
override def schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier)
|
override def schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier)
|
||||||
|
|
||||||
|
|
@ -71,22 +71,17 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor
|
||||||
override def maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - maximumOverhead
|
override def maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - maximumOverhead
|
||||||
|
|
||||||
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
|
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
|
||||||
val listenPromise: Promise[(Address, Promise[AssociationEventListener])] = Promise()
|
|
||||||
val upstreamListenerPromise: Promise[AssociationEventListener] = Promise()
|
val upstreamListenerPromise: Promise[AssociationEventListener] = Promise()
|
||||||
wrappedTransport.listen.onComplete {
|
wrappedTransport.listen andThen {
|
||||||
case Success((listenAddress, listenerPromise)) ⇒
|
case Success((listenAddress, listenerPromise)) ⇒
|
||||||
// Register to downstream
|
// Register to downstream
|
||||||
listenerPromise.tryCompleteWith(interceptListen(listenAddress, upstreamListenerPromise.future))
|
listenerPromise.tryCompleteWith(interceptListen(listenAddress, upstreamListenerPromise.future))
|
||||||
// Notify upstream
|
} map { case (listenAddress, _) ⇒ (augmentScheme(listenAddress), upstreamListenerPromise) }
|
||||||
listenPromise.success((augmentScheme(listenAddress), upstreamListenerPromise))
|
|
||||||
case Failure(reason) ⇒ listenPromise.failure(reason)
|
|
||||||
}
|
|
||||||
listenPromise.future
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def associate(remoteAddress: Address): Future[Status] = {
|
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
|
||||||
// Prepare a future, and pass its promise to the manager
|
// Prepare a future, and pass its promise to the manager
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
interceptAssociate(removeScheme(remoteAddress), statusPromise)
|
interceptAssociate(removeScheme(remoteAddress), statusPromise)
|
||||||
|
|
||||||
|
|
@ -118,7 +113,7 @@ object ActorTransportAdapter {
|
||||||
sealed trait TransportOperation
|
sealed trait TransportOperation
|
||||||
|
|
||||||
case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation
|
case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation
|
||||||
case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[Status]) extends TransportOperation
|
case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation
|
||||||
case class ListenUnderlying(listenAddress: Address,
|
case class ListenUnderlying(listenAddress: Address,
|
||||||
upstreamListener: Future[AssociationEventListener]) extends TransportOperation
|
upstreamListener: Future[AssociationEventListener]) extends TransportOperation
|
||||||
case object DisassociateUnderlying extends TransportOperation
|
case object DisassociateUnderlying extends TransportOperation
|
||||||
|
|
@ -149,7 +144,7 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit =
|
override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit =
|
||||||
manager ! AssociateUnderlying(remoteAddress, statusPromise)
|
manager ! AssociateUnderlying(remoteAddress, statusPromise)
|
||||||
|
|
||||||
override def shutdown(): Unit = manager ! PoisonPill
|
override def shutdown(): Unit = manager ! PoisonPill
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,9 @@ import scala.util.{ Success, Failure }
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import akka.remote.transport.ActorTransportAdapter._
|
import akka.remote.transport.ActorTransportAdapter._
|
||||||
|
|
||||||
class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace
|
class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace {
|
||||||
|
def this(msg: String) = this(msg, null)
|
||||||
|
}
|
||||||
|
|
||||||
private[remote] class AkkaProtocolSettings(config: Config) {
|
private[remote] class AkkaProtocolSettings(config: Config) {
|
||||||
|
|
||||||
|
|
@ -221,11 +223,11 @@ private[transport] object ProtocolStateActor {
|
||||||
trait InitialProtocolStateData extends ProtocolStateData
|
trait InitialProtocolStateData extends ProtocolStateData
|
||||||
|
|
||||||
// Neither the underlying, nor the provided transport is associated
|
// Neither the underlying, nor the provided transport is associated
|
||||||
case class OutboundUnassociated(remoteAddress: Address, statusPromise: Promise[Status], transport: Transport)
|
case class OutboundUnassociated(remoteAddress: Address, statusPromise: Promise[AssociationHandle], transport: Transport)
|
||||||
extends InitialProtocolStateData
|
extends InitialProtocolStateData
|
||||||
|
|
||||||
// The underlying transport is associated, but the handshake of the akka protocol is not yet finished
|
// The underlying transport is associated, but the handshake of the akka protocol is not yet finished
|
||||||
case class OutboundUnderlyingAssociated(statusPromise: Promise[Status], wrappedHandle: AssociationHandle)
|
case class OutboundUnderlyingAssociated(statusPromise: Promise[AssociationHandle], wrappedHandle: AssociationHandle)
|
||||||
extends ProtocolStateData
|
extends ProtocolStateData
|
||||||
|
|
||||||
// The underlying transport is associated, but the handshake of the akka protocol is not yet finished
|
// The underlying transport is associated, but the handshake of the akka protocol is not yet finished
|
||||||
|
|
@ -256,7 +258,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
||||||
// Outbound case
|
// Outbound case
|
||||||
def this(localAddress: Address,
|
def this(localAddress: Address,
|
||||||
remoteAddress: Address,
|
remoteAddress: Address,
|
||||||
statusPromise: Promise[Status],
|
statusPromise: Promise[AssociationHandle],
|
||||||
transport: Transport,
|
transport: Transport,
|
||||||
settings: AkkaProtocolSettings,
|
settings: AkkaProtocolSettings,
|
||||||
codec: AkkaPduCodec,
|
codec: AkkaPduCodec,
|
||||||
|
|
@ -287,15 +289,11 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
||||||
when(Closed) {
|
when(Closed) {
|
||||||
|
|
||||||
// Transport layer events for outbound associations
|
// Transport layer events for outbound associations
|
||||||
case Event(s @ Invalid(_), OutboundUnassociated(_, statusPromise, _)) ⇒
|
case Event(Status.Failure(e), OutboundUnassociated(_, statusPromise, _)) ⇒
|
||||||
statusPromise.success(s)
|
statusPromise.failure(e)
|
||||||
stop()
|
stop()
|
||||||
|
|
||||||
case Event(s @ Fail(_), OutboundUnassociated(_, statusPromise, _)) ⇒
|
case Event(wrappedHandle: AssociationHandle, OutboundUnassociated(_, statusPromise, _)) ⇒
|
||||||
statusPromise.success(s)
|
|
||||||
stop()
|
|
||||||
|
|
||||||
case Event(Ready(wrappedHandle), OutboundUnassociated(_, statusPromise, _)) ⇒
|
|
||||||
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self))
|
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self))
|
||||||
sendAssociate(wrappedHandle)
|
sendAssociate(wrappedHandle)
|
||||||
failureDetector.heartbeat()
|
failureDetector.heartbeat()
|
||||||
|
|
@ -431,14 +429,14 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
||||||
|
|
||||||
onTermination {
|
onTermination {
|
||||||
case StopEvent(_, _, OutboundUnassociated(remoteAddress, statusPromise, transport)) ⇒
|
case StopEvent(_, _, OutboundUnassociated(remoteAddress, statusPromise, transport)) ⇒
|
||||||
statusPromise.trySuccess(Fail(new AkkaProtocolException("Transport disassociated before handshake finished", null)))
|
statusPromise.tryFailure(new AkkaProtocolException("Transport disassociated before handshake finished", null))
|
||||||
|
|
||||||
case StopEvent(reason, _, OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) ⇒
|
case StopEvent(reason, _, OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) ⇒
|
||||||
val msg = reason match {
|
val msg = reason match {
|
||||||
case FSM.Failure(TimeoutReason) ⇒ "No response from remote. Handshake timed out."
|
case FSM.Failure(TimeoutReason) ⇒ "No response from remote. Handshake timed out."
|
||||||
case _ ⇒ "Remote endpoint disassociated before handshake finished"
|
case _ ⇒ "Remote endpoint disassociated before handshake finished"
|
||||||
}
|
}
|
||||||
statusPromise.trySuccess(Fail(new AkkaProtocolException(msg, null)))
|
statusPromise.tryFailure(new AkkaProtocolException(msg, null))
|
||||||
wrappedHandle.disassociate()
|
wrappedHandle.disassociate()
|
||||||
|
|
||||||
case StopEvent(_, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒
|
case StopEvent(_, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒
|
||||||
|
|
@ -458,7 +456,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
||||||
readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self
|
readHandlerPromise.future.map { HandleListenerRegistered(_) } pipeTo self
|
||||||
|
|
||||||
private def notifyOutboundHandler(wrappedHandle: AssociationHandle,
|
private def notifyOutboundHandler(wrappedHandle: AssociationHandle,
|
||||||
statusPromise: Promise[Status]): Future[HandleEventListener] = {
|
statusPromise: Promise[AssociationHandle]): Future[HandleEventListener] = {
|
||||||
val readHandlerPromise: Promise[HandleEventListener] = Promise()
|
val readHandlerPromise: Promise[HandleEventListener] = Promise()
|
||||||
listenForListenerRegistration(readHandlerPromise)
|
listenForListenerRegistration(readHandlerPromise)
|
||||||
|
|
||||||
|
|
@ -471,7 +469,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
||||||
self,
|
self,
|
||||||
codec)
|
codec)
|
||||||
|
|
||||||
statusPromise.success(Ready(exposedHandle))
|
statusPromise.success(exposedHandle)
|
||||||
readHandlerPromise.future
|
readHandlerPromise.future
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -71,18 +71,14 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
|
||||||
Future.successful(this)
|
Future.successful(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[Status]): Unit = {
|
protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit = {
|
||||||
// Association is simulated to be failed if there was either an inbound or outbound message drop
|
// Association is simulated to be failed if there was either an inbound or outbound message drop
|
||||||
if (shouldDropInbound(remoteAddress) || shouldDropOutbound(remoteAddress))
|
if (shouldDropInbound(remoteAddress) || shouldDropOutbound(remoteAddress))
|
||||||
statusPromise.success(Fail(new FailureInjectorException("Simulated failure of association to " + remoteAddress)))
|
statusPromise.failure(new FailureInjectorException("Simulated failure of association to " + remoteAddress))
|
||||||
else
|
else
|
||||||
statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map {
|
statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map { handle ⇒
|
||||||
_ match {
|
addressChaosTable.putIfAbsent(handle.remoteAddress.copy(protocol = "", system = ""), PassThru)
|
||||||
case Ready(handle) ⇒
|
new FailureInjectorHandle(handle, this)
|
||||||
addressChaosTable.putIfAbsent(handle.remoteAddress.copy(protocol = "", system = ""), PassThru)
|
|
||||||
Ready(new FailureInjectorHandle(handle, this))
|
|
||||||
case s: Status ⇒ s
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,10 +44,10 @@ class TestTransport(
|
||||||
associationListenerPromise.future.onSuccess {
|
associationListenerPromise.future.onSuccess {
|
||||||
case listener: AssociationEventListener ⇒ registry.registerTransport(this, listener)
|
case listener: AssociationEventListener ⇒ registry.registerTransport(this, listener)
|
||||||
}
|
}
|
||||||
Promise.successful((localAddress, associationListenerPromise)).future
|
Future.successful((localAddress, associationListenerPromise))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def defaultAssociate(remoteAddress: Address): Future[Status] = {
|
private def defaultAssociate(remoteAddress: Address): Future[AssociationHandle] = {
|
||||||
registry.transportFor(remoteAddress) match {
|
registry.transportFor(remoteAddress) match {
|
||||||
|
|
||||||
case Some((remoteTransport, listener)) ⇒
|
case Some((remoteTransport, listener)) ⇒
|
||||||
|
|
@ -61,10 +61,10 @@ class TestTransport(
|
||||||
registry.registerListenerPair(localHandle.key, bothSides)
|
registry.registerListenerPair(localHandle.key, bothSides)
|
||||||
listener notify InboundAssociation(remoteHandle)
|
listener notify InboundAssociation(remoteHandle)
|
||||||
|
|
||||||
Promise.successful(Ready(localHandle)).future
|
Future.successful(localHandle)
|
||||||
|
|
||||||
case None ⇒
|
case None ⇒
|
||||||
Promise.successful(Fail(new IllegalArgumentException(s"No registered transport: $remoteAddress"))).future
|
Future.failed(new IllegalArgumentException(s"No registered transport: $remoteAddress"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -75,7 +75,7 @@ class TestTransport(
|
||||||
(localHandle, remoteHandle)
|
(localHandle, remoteHandle)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def defaultShutdown: Future[Unit] = Promise.successful(()).future
|
private def defaultShutdown: Future[Unit] = Future.successful(())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method.
|
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method.
|
||||||
|
|
@ -87,7 +87,7 @@ class TestTransport(
|
||||||
/**
|
/**
|
||||||
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method.
|
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method.
|
||||||
*/
|
*/
|
||||||
val associateBehavior = new SwitchableLoggedBehavior[Address, Status](
|
val associateBehavior = new SwitchableLoggedBehavior[Address, AssociationHandle](
|
||||||
defaultAssociate _,
|
defaultAssociate _,
|
||||||
(remoteAddress) ⇒ registry.logActivity(AssociateAttempt(localAddress, remoteAddress)))
|
(remoteAddress) ⇒ registry.logActivity(AssociateAttempt(localAddress, remoteAddress)))
|
||||||
|
|
||||||
|
|
@ -99,7 +99,7 @@ class TestTransport(
|
||||||
(_) ⇒ registry.logActivity(ShutdownAttempt(localAddress)))
|
(_) ⇒ registry.logActivity(ShutdownAttempt(localAddress)))
|
||||||
|
|
||||||
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior()
|
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior()
|
||||||
override def associate(remoteAddress: Address): Future[Status] = associateBehavior(remoteAddress)
|
override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress)
|
||||||
override def shutdown(): Unit = shutdownBehavior()
|
override def shutdown(): Unit = shutdownBehavior()
|
||||||
|
|
||||||
private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = {
|
private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = {
|
||||||
|
|
@ -111,7 +111,7 @@ class TestTransport(
|
||||||
}
|
}
|
||||||
writePromise.future
|
writePromise.future
|
||||||
case None ⇒
|
case None ⇒
|
||||||
Promise.failed(new IllegalStateException("No association present")).future
|
Future.failed(new IllegalStateException("No association present"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -123,7 +123,7 @@ class TestTransport(
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Promise.successful(()).future
|
Future.successful(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -210,7 +210,7 @@ object TestTransport {
|
||||||
* The constant the future will be completed with.
|
* The constant the future will be completed with.
|
||||||
*/
|
*/
|
||||||
def pushConstant(c: B): Unit = push {
|
def pushConstant(c: B): Unit = push {
|
||||||
(x) ⇒ Promise.successful(c).future
|
(x) ⇒ Future.successful(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -220,7 +220,7 @@ object TestTransport {
|
||||||
* The throwable the failed future will contain.
|
* The throwable the failed future will contain.
|
||||||
*/
|
*/
|
||||||
def pushError(e: Throwable): Unit = push {
|
def pushError(e: Throwable): Unit = push {
|
||||||
(x) ⇒ Promise.failed(e).future
|
(x) ⇒ Future.failed(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -332,8 +332,8 @@ object TestTransport {
|
||||||
* Indicates if all given transports were successfully registered. No associations can be established between
|
* Indicates if all given transports were successfully registered. No associations can be established between
|
||||||
* transports that are not yet registered.
|
* transports that are not yet registered.
|
||||||
*
|
*
|
||||||
* @param transports
|
* @param addresses
|
||||||
* The transports that participate in the test case.
|
* The listen addresses of transports that participate in the test case.
|
||||||
* @return
|
* @return
|
||||||
* True if all transports are successfully registered.
|
* True if all transports are successfully registered.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ import scala.annotation.tailrec
|
||||||
import scala.collection.immutable.Queue
|
import scala.collection.immutable.Queue
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
import scala.math.min
|
import scala.math.min
|
||||||
import scala.util.Success
|
import scala.util.{ Success, Failure }
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
|
@ -151,16 +151,20 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
|
||||||
val wrappedHandle = wrapHandle(handle, associationListener, inbound = true)
|
val wrappedHandle = wrapHandle(handle, associationListener, inbound = true)
|
||||||
wrappedHandle.throttlerActor ! wrappedHandle
|
wrappedHandle.throttlerActor ! wrappedHandle
|
||||||
case AssociateUnderlying(remoteAddress, statusPromise) ⇒
|
case AssociateUnderlying(remoteAddress, statusPromise) ⇒
|
||||||
wrappedTransport.associate(remoteAddress).onComplete {
|
wrappedTransport.associate(remoteAddress) onComplete {
|
||||||
case Success(Ready(handle)) ⇒
|
// Slight modification of pipe, only success is sent, failure is propagated to a separate future
|
||||||
val wrappedHandle = wrapHandle(handle, associationListener, inbound = false)
|
case Success(handle) ⇒ self ! (handle, statusPromise)
|
||||||
val inMode = getInboundMode(nakedAddress(remoteAddress))
|
case Failure(e) ⇒ statusPromise.failure(e)
|
||||||
wrappedHandle.outboundThrottleMode.set(getOutboundMode(nakedAddress(remoteAddress)))
|
|
||||||
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
|
|
||||||
handleTable ::= nakedAddress(remoteAddress) -> wrappedHandle
|
|
||||||
statusPromise.success(Ready(wrappedHandle))
|
|
||||||
case s @ _ ⇒ statusPromise.complete(s)
|
|
||||||
}
|
}
|
||||||
|
// Finished outbound association and got back the handle
|
||||||
|
case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒
|
||||||
|
val wrappedHandle = wrapHandle(handle, associationListener, inbound = false)
|
||||||
|
val naked = nakedAddress(handle.remoteAddress)
|
||||||
|
val inMode = getInboundMode(naked)
|
||||||
|
wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked))
|
||||||
|
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
|
||||||
|
handleTable ::= nakedAddress(naked) -> wrappedHandle
|
||||||
|
statusPromise.success(wrappedHandle)
|
||||||
case s @ SetThrottle(address, direction, mode) ⇒
|
case s @ SetThrottle(address, direction, mode) ⇒
|
||||||
val naked = nakedAddress(address)
|
val naked = nakedAddress(address)
|
||||||
throttlingModes += naked -> (mode, direction)
|
throttlingModes += naked -> (mode, direction)
|
||||||
|
|
|
||||||
|
|
@ -4,39 +4,17 @@ import scala.concurrent.{ Promise, Future }
|
||||||
import akka.actor.{ ActorRef, Address }
|
import akka.actor.{ ActorRef, Address }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import akka.remote.transport.AssociationHandle.HandleEventListener
|
import akka.remote.transport.AssociationHandle.HandleEventListener
|
||||||
|
import akka.AkkaException
|
||||||
|
|
||||||
object Transport {
|
object Transport {
|
||||||
|
|
||||||
trait AssociationEvent
|
trait AssociationEvent
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents fine grained status of an association attempt.
|
|
||||||
*/
|
|
||||||
sealed trait Status extends AssociationEvent
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address,
|
* Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address,
|
||||||
* hostname, etc.).
|
* hostname, etc.).
|
||||||
*/
|
*/
|
||||||
case class Invalid(cause: Throwable) extends Status
|
case class InvalidAssociationException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
|
||||||
|
|
||||||
/**
|
|
||||||
* The association setup has failed, but it is not known that a recovery is possible or not. Generally it means
|
|
||||||
* that the transport gave up its attempts to associate, but a retry might be successful at a later time.
|
|
||||||
*
|
|
||||||
* @param cause Cause of the failure
|
|
||||||
*/
|
|
||||||
case class Fail(cause: Throwable) extends Status
|
|
||||||
|
|
||||||
/**
|
|
||||||
* No detectable errors happened during association. Generally a status of Ready does not guarantee that the
|
|
||||||
* association was successful. For example in the case of UDP, the transport MAY return Ready immediately after an
|
|
||||||
* association setup was requested.
|
|
||||||
*
|
|
||||||
* @param association
|
|
||||||
* The handle for the created association.
|
|
||||||
*/
|
|
||||||
case class Ready(association: AssociationHandle) extends Status
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Message sent to a [[akka.remote.transport.Transport.AssociationEventListener]] registered to a transport
|
* Message sent to a [[akka.remote.transport.Transport.AssociationEventListener]] registered to a transport
|
||||||
|
|
@ -125,15 +103,16 @@ trait Transport {
|
||||||
* real transport-layer connection (TCP), more lightweight connections provided over datagram protocols (UDP with
|
* real transport-layer connection (TCP), more lightweight connections provided over datagram protocols (UDP with
|
||||||
* additional services), substreams of multiplexed connections (SCTP) or physical links (serial port).
|
* additional services), substreams of multiplexed connections (SCTP) or physical links (serial port).
|
||||||
*
|
*
|
||||||
* This call returns a fine-grained status indication of the attempt wrapped in a Future. See
|
* This call returns a future of an [[akka.remote.transport.AssociationHandle]]. A failed future indicates that
|
||||||
* [[akka.remote.transport.Transport.Status]] for details.
|
* the association attempt was unsuccessful. If the exception is [[akka.remote.transport.Transport.InvalidAssociationException]]
|
||||||
|
* then the association request was invalid, and it is impossible to recover.
|
||||||
*
|
*
|
||||||
* @param remoteAddress
|
* @param remoteAddress
|
||||||
* The address of the remote transport entity.
|
* The address of the remote transport entity.
|
||||||
* @return
|
* @return
|
||||||
* A status instance representing failure or a success containing an [[akka.remote.transport.AssociationHandle]]
|
* A status instance representing failure or a success containing an [[akka.remote.transport.AssociationHandle]]
|
||||||
*/
|
*/
|
||||||
def associate(remoteAddress: Address): Future[Status]
|
def associate(remoteAddress: Address): Future[AssociationHandle]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shuts down the transport layer and releases all the corresponding resources. Shutdown is asynchronous, may be
|
* Shuts down the transport layer and releases all the corresponding resources. Shutdown is asynchronous, may be
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,9 @@ object NettyTransportSettings {
|
||||||
case object Udp extends Mode { override def toString = "udp" }
|
case object Udp extends Mode { override def toString = "udp" }
|
||||||
}
|
}
|
||||||
|
|
||||||
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause)
|
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) {
|
||||||
|
def this(msg: String) = this(msg, null)
|
||||||
|
}
|
||||||
|
|
||||||
class NettyTransportSettings(config: Config) {
|
class NettyTransportSettings(config: Config) {
|
||||||
|
|
||||||
|
|
@ -137,12 +139,12 @@ abstract class ServerHandler(protected final val transport: NettyTransport,
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class ClientHandler(protected final val transport: NettyTransport,
|
abstract class ClientHandler(protected final val transport: NettyTransport,
|
||||||
private final val statusPromise: Promise[Status])
|
private final val statusPromise: Promise[AssociationHandle])
|
||||||
extends NettyClientHelpers with CommonHandlers {
|
extends NettyClientHelpers with CommonHandlers {
|
||||||
|
|
||||||
final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = {
|
final protected def initOutbound(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit = {
|
||||||
channel.setReadable(false)
|
channel.setReadable(false)
|
||||||
init(channel, remoteSocketAddress, msg) { handle ⇒ statusPromise.success(Ready(handle)) }
|
init(channel, remoteSocketAddress, msg)(statusPromise.success)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -227,7 +229,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def clientPipelineFactory(statusPromise: Promise[Status]): ChannelPipelineFactory = new ChannelPipelineFactory {
|
private def clientPipelineFactory(statusPromise: Promise[AssociationHandle]): ChannelPipelineFactory = new ChannelPipelineFactory {
|
||||||
override def getPipeline: ChannelPipeline = {
|
override def getPipeline: ChannelPipeline = {
|
||||||
val pipeline = newPipeline
|
val pipeline = newPipeline
|
||||||
if (EnableSsl) pipeline.addFirst("SslHandler", NettySSLSupport(settings.SslSettings.get, log, true))
|
if (EnableSsl) pipeline.addFirst("SslHandler", NettySSLSupport(settings.SslSettings.get, log, true))
|
||||||
|
|
@ -258,7 +260,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
|
||||||
case Udp ⇒ setupBootstrap(new ConnectionlessBootstrap(serverChannelFactory), serverPipelineFactory)
|
case Udp ⇒ setupBootstrap(new ConnectionlessBootstrap(serverChannelFactory), serverPipelineFactory)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def outboundBootstrap(statusPromise: Promise[Status]): ClientBootstrap = {
|
private def outboundBootstrap(statusPromise: Promise[AssociationHandle]): ClientBootstrap = {
|
||||||
val bootstrap = setupBootstrap(new ClientBootstrap(clientChannelFactory), clientPipelineFactory(statusPromise))
|
val bootstrap = setupBootstrap(new ClientBootstrap(clientChannelFactory), clientPipelineFactory(statusPromise))
|
||||||
bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis)
|
bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis)
|
||||||
bootstrap
|
bootstrap
|
||||||
|
|
@ -307,7 +309,7 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
|
||||||
|
|
||||||
case None ⇒
|
case None ⇒
|
||||||
listenPromise.failure(
|
listenPromise.failure(
|
||||||
new NettyTransportException(s"Unknown local address type ${serverChannel.getLocalAddress.getClass}", null))
|
new NettyTransportException(s"Unknown local address type ${serverChannel.getLocalAddress.getClass}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch {
|
} catch {
|
||||||
|
|
@ -317,58 +319,61 @@ class NettyTransport(private val settings: NettyTransportSettings, private val s
|
||||||
listenPromise.future
|
listenPromise.future
|
||||||
}
|
}
|
||||||
|
|
||||||
override def associate(remoteAddress: Address): Future[Status] = {
|
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
if (!serverChannel.isBound) statusPromise.success(Fail(new NettyTransportException("Transport is not bound", null)))
|
if (!serverChannel.isBound) statusPromise.failure(new NettyTransportException("Transport is not bound"))
|
||||||
|
else {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!isDatagram) {
|
if (!isDatagram) {
|
||||||
val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))
|
val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))
|
||||||
|
|
||||||
connectFuture.addListener(new ChannelFutureListener {
|
connectFuture.addListener(new ChannelFutureListener {
|
||||||
override def operationComplete(future: ChannelFuture) {
|
override def operationComplete(future: ChannelFuture) {
|
||||||
if (!future.isSuccess)
|
if (!future.isSuccess)
|
||||||
statusPromise.failure(future.getCause)
|
statusPromise.failure(future.getCause)
|
||||||
else if (future.isCancelled)
|
else if (future.isCancelled)
|
||||||
statusPromise.failure(new NettyTransportException("Connection was cancelled", null))
|
statusPromise.failure(new NettyTransportException("Connection was cancelled"))
|
||||||
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))
|
val connectFuture = outboundBootstrap(statusPromise).connect(addressToSocketAddress(remoteAddress))
|
||||||
|
|
||||||
connectFuture.addListener(new ChannelFutureListener {
|
connectFuture.addListener(new ChannelFutureListener {
|
||||||
def operationComplete(future: ChannelFuture) {
|
def operationComplete(future: ChannelFuture) {
|
||||||
if (!future.isSuccess)
|
if (!future.isSuccess)
|
||||||
statusPromise.failure(future.getCause)
|
statusPromise.failure(future.getCause)
|
||||||
else if (future.isCancelled)
|
else if (future.isCancelled)
|
||||||
statusPromise.failure(new NettyTransportException("Connection was cancelled", null))
|
statusPromise.failure(new NettyTransportException("Connection was cancelled"))
|
||||||
else {
|
else {
|
||||||
val handle: UdpAssociationHandle = new UdpAssociationHandle(localAddress, remoteAddress, future.getChannel, NettyTransport.this)
|
val handle: UdpAssociationHandle =
|
||||||
|
new UdpAssociationHandle(localAddress, remoteAddress, future.getChannel, NettyTransport.this)
|
||||||
|
|
||||||
future.getChannel.getRemoteAddress match {
|
future.getChannel.getRemoteAddress match {
|
||||||
case addr: InetSocketAddress ⇒
|
case addr: InetSocketAddress ⇒
|
||||||
statusPromise.success(Ready(handle))
|
statusPromise.success(handle)
|
||||||
handle.readHandlerPromise.future.onSuccess {
|
handle.readHandlerPromise.future.onSuccess {
|
||||||
case listener: HandleEventListener ⇒ udpConnectionTable.put(addr, listener)
|
case listener: HandleEventListener ⇒ udpConnectionTable.put(addr, listener)
|
||||||
}
|
}
|
||||||
case a @ _ ⇒ statusPromise.success(Fail(
|
case a ⇒ statusPromise.failure(
|
||||||
new NettyTransportException("Unknown remote address type " + a.getClass, null)))
|
new NettyTransportException("Unknown remote address type " + a.getClass))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
})
|
}
|
||||||
|
|
||||||
|
} catch {
|
||||||
|
|
||||||
|
case e @ (_: UnknownHostException | _: SecurityException | _: IllegalArgumentException) ⇒
|
||||||
|
statusPromise.failure(InvalidAssociationException("Invalid association ", e))
|
||||||
|
|
||||||
|
case NonFatal(e) ⇒
|
||||||
|
statusPromise.failure(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch {
|
|
||||||
|
|
||||||
case e @ (_: UnknownHostException | _: SecurityException | _: IllegalArgumentException) ⇒
|
|
||||||
statusPromise.success(Invalid(e))
|
|
||||||
|
|
||||||
case NonFatal(e) ⇒
|
|
||||||
statusPromise.success(Fail(e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
statusPromise.future
|
statusPromise.future
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package akka.remote.transport.netty
|
||||||
import akka.actor.Address
|
import akka.actor.Address
|
||||||
import akka.remote.transport.AssociationHandle
|
import akka.remote.transport.AssociationHandle
|
||||||
import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener, Disassociated, InboundPayload }
|
import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener, Disassociated, InboundPayload }
|
||||||
import akka.remote.transport.Transport.{ AssociationEventListener, Status }
|
import akka.remote.transport.Transport.AssociationEventListener
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
|
import org.jboss.netty.buffer.{ ChannelBuffers, ChannelBuffer }
|
||||||
|
|
@ -50,7 +50,7 @@ private[remote] class TcpServerHandler(_transport: NettyTransport, _associationL
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private[remote] class TcpClientHandler(_transport: NettyTransport, _statusPromise: Promise[Status])
|
private[remote] class TcpClientHandler(_transport: NettyTransport, _statusPromise: Promise[AssociationHandle])
|
||||||
extends ClientHandler(_transport, _statusPromise) with TcpHandlers {
|
extends ClientHandler(_transport, _statusPromise) with TcpHandlers {
|
||||||
|
|
||||||
override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
|
override def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ package akka.remote.transport.netty
|
||||||
import akka.actor.Address
|
import akka.actor.Address
|
||||||
import akka.remote.transport.AssociationHandle
|
import akka.remote.transport.AssociationHandle
|
||||||
import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload }
|
import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload }
|
||||||
import akka.remote.transport.Transport.{ AssociationEventListener, Status }
|
import akka.remote.transport.Transport.AssociationEventListener
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import java.net.{ SocketAddress, InetAddress, InetSocketAddress }
|
import java.net.{ SocketAddress, InetAddress, InetSocketAddress }
|
||||||
import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }
|
import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }
|
||||||
|
|
@ -49,7 +49,7 @@ private[remote] class UdpServerHandler(_transport: NettyTransport, _associationL
|
||||||
initInbound(channel, remoteSocketAddress, msg)
|
initInbound(channel, remoteSocketAddress, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[remote] class UdpClientHandler(_transport: NettyTransport, _statusPromise: Promise[Status])
|
private[remote] class UdpClientHandler(_transport: NettyTransport, _statusPromise: Promise[AssociationHandle])
|
||||||
extends ClientHandler(_transport, _statusPromise) with UdpHandlers {
|
extends ClientHandler(_transport, _statusPromise) with UdpHandlers {
|
||||||
|
|
||||||
override def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit =
|
override def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit =
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,10 @@ akka.loglevel = DEBUG
|
||||||
val target1 = other.actorFor(RootActorPath(addr) / "remote")
|
val target1 = other.actorFor(RootActorPath(addr) / "remote")
|
||||||
val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements)
|
val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements)
|
||||||
|
|
||||||
|
override def atTermination() {
|
||||||
|
other.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
// need to enable debug log-level without actually printing those messages
|
// need to enable debug log-level without actually printing those messages
|
||||||
system.eventStream.publish(TestEvent.Mute(EventFilter.debug()))
|
system.eventStream.publish(TestEvent.Mute(EventFilter.debug()))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -189,9 +189,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
|
|
||||||
"serve the handle as soon as possible if WaitActivity is turned off" in {
|
"serve the handle as soon as possible if WaitActivity is turned off" in {
|
||||||
val (failureDetector, registry, transport, handle) = collaborators
|
val (failureDetector, registry, transport, handle) = collaborators
|
||||||
transport.associateBehavior.pushConstant(Transport.Ready(handle))
|
transport.associateBehavior.pushConstant(handle)
|
||||||
|
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
system.actorOf(Props(new ProtocolStateActor(
|
system.actorOf(Props(new ProtocolStateActor(
|
||||||
localAddress,
|
localAddress,
|
||||||
|
|
@ -203,7 +203,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
failureDetector)))
|
failureDetector)))
|
||||||
|
|
||||||
Await.result(statusPromise.future, 3 seconds) match {
|
Await.result(statusPromise.future, 3 seconds) match {
|
||||||
case Transport.Ready(h) ⇒
|
case h: AssociationHandle ⇒
|
||||||
h.remoteAddress must be === remoteAkkaAddress
|
h.remoteAddress must be === remoteAkkaAddress
|
||||||
h.localAddress must be === localAkkaAddress
|
h.localAddress must be === localAkkaAddress
|
||||||
|
|
||||||
|
|
@ -217,9 +217,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
|
|
||||||
"in outbound mode with WaitActivity delay readiness until activity detected" in {
|
"in outbound mode with WaitActivity delay readiness until activity detected" in {
|
||||||
val (failureDetector, registry, transport, handle) = collaborators
|
val (failureDetector, registry, transport, handle) = collaborators
|
||||||
transport.associateBehavior.pushConstant(Transport.Ready(handle))
|
transport.associateBehavior.pushConstant(handle)
|
||||||
|
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
val reader = system.actorOf(Props(new ProtocolStateActor(
|
val reader = system.actorOf(Props(new ProtocolStateActor(
|
||||||
localAddress,
|
localAddress,
|
||||||
|
|
@ -242,7 +242,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
reader ! testPayload
|
reader ! testPayload
|
||||||
|
|
||||||
Await.result(statusPromise.future, 3 seconds) match {
|
Await.result(statusPromise.future, 3 seconds) match {
|
||||||
case Transport.Ready(h) ⇒
|
case h: AssociationHandle ⇒
|
||||||
h.remoteAddress must be === remoteAkkaAddress
|
h.remoteAddress must be === remoteAkkaAddress
|
||||||
h.localAddress must be === localAkkaAddress
|
h.localAddress must be === localAkkaAddress
|
||||||
|
|
||||||
|
|
@ -298,9 +298,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
|
|
||||||
"send cookie in Associate PDU if configured to do so" in {
|
"send cookie in Associate PDU if configured to do so" in {
|
||||||
val (failureDetector, registry, transport, handle) = collaborators
|
val (failureDetector, registry, transport, handle) = collaborators
|
||||||
transport.associateBehavior.pushConstant(Transport.Ready(handle))
|
transport.associateBehavior.pushConstant(handle)
|
||||||
|
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
system.actorOf(Props(new ProtocolStateActor(
|
system.actorOf(Props(new ProtocolStateActor(
|
||||||
localAddress,
|
localAddress,
|
||||||
|
|
@ -316,7 +316,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
failureDetector)))
|
failureDetector)))
|
||||||
|
|
||||||
Await.result(statusPromise.future, 3 seconds) match {
|
Await.result(statusPromise.future, 3 seconds) match {
|
||||||
case Transport.Ready(h) ⇒
|
case h: AssociationHandle ⇒
|
||||||
h.remoteAddress must be === remoteAkkaAddress
|
h.remoteAddress must be === remoteAkkaAddress
|
||||||
h.localAddress must be === localAkkaAddress
|
h.localAddress must be === localAkkaAddress
|
||||||
|
|
||||||
|
|
@ -328,9 +328,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
|
|
||||||
"handle explicit disassociate messages" in {
|
"handle explicit disassociate messages" in {
|
||||||
val (failureDetector, registry, transport, handle) = collaborators
|
val (failureDetector, registry, transport, handle) = collaborators
|
||||||
transport.associateBehavior.pushConstant(Transport.Ready(handle))
|
transport.associateBehavior.pushConstant(handle)
|
||||||
|
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
val reader = system.actorOf(Props(new ProtocolStateActor(
|
val reader = system.actorOf(Props(new ProtocolStateActor(
|
||||||
localAddress,
|
localAddress,
|
||||||
|
|
@ -342,7 +342,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
failureDetector)))
|
failureDetector)))
|
||||||
|
|
||||||
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
||||||
case Transport.Ready(h) ⇒
|
case h: AssociationHandle ⇒
|
||||||
h.remoteAddress must be === remoteAkkaAddress
|
h.remoteAddress must be === remoteAkkaAddress
|
||||||
h.localAddress must be === localAkkaAddress
|
h.localAddress must be === localAkkaAddress
|
||||||
h
|
h
|
||||||
|
|
@ -361,9 +361,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
|
|
||||||
"handle transport level disassociations" in {
|
"handle transport level disassociations" in {
|
||||||
val (failureDetector, registry, transport, handle) = collaborators
|
val (failureDetector, registry, transport, handle) = collaborators
|
||||||
transport.associateBehavior.pushConstant(Transport.Ready(handle))
|
transport.associateBehavior.pushConstant(handle)
|
||||||
|
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
val reader = system.actorOf(Props(new ProtocolStateActor(
|
val reader = system.actorOf(Props(new ProtocolStateActor(
|
||||||
localAddress,
|
localAddress,
|
||||||
|
|
@ -380,7 +380,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
reader ! testHeartbeat
|
reader ! testHeartbeat
|
||||||
|
|
||||||
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
||||||
case Transport.Ready(h) ⇒
|
case h: AssociationHandle ⇒
|
||||||
h.remoteAddress must be === remoteAkkaAddress
|
h.remoteAddress must be === remoteAkkaAddress
|
||||||
h.localAddress must be === localAkkaAddress
|
h.localAddress must be === localAkkaAddress
|
||||||
h
|
h
|
||||||
|
|
@ -399,9 +399,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
|
|
||||||
"disassociate when failure detector signals failure" in {
|
"disassociate when failure detector signals failure" in {
|
||||||
val (failureDetector, registry, transport, handle) = collaborators
|
val (failureDetector, registry, transport, handle) = collaborators
|
||||||
transport.associateBehavior.pushConstant(Transport.Ready(handle))
|
transport.associateBehavior.pushConstant(handle)
|
||||||
|
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
system.actorOf(Props(new ProtocolStateActor(
|
system.actorOf(Props(new ProtocolStateActor(
|
||||||
localAddress,
|
localAddress,
|
||||||
|
|
@ -413,7 +413,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
failureDetector)))
|
failureDetector)))
|
||||||
|
|
||||||
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
||||||
case Transport.Ready(h) ⇒
|
case h: AssociationHandle ⇒
|
||||||
h.remoteAddress must be === remoteAkkaAddress
|
h.remoteAddress must be === remoteAkkaAddress
|
||||||
h.localAddress must be === localAkkaAddress
|
h.localAddress must be === localAkkaAddress
|
||||||
h
|
h
|
||||||
|
|
@ -435,9 +435,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
|
|
||||||
"handle correctly when the handler is registered only after the association is already closed" in {
|
"handle correctly when the handler is registered only after the association is already closed" in {
|
||||||
val (failureDetector, _, transport, handle) = collaborators
|
val (failureDetector, _, transport, handle) = collaborators
|
||||||
transport.associateBehavior.pushConstant(Transport.Ready(handle))
|
transport.associateBehavior.pushConstant(handle)
|
||||||
|
|
||||||
val statusPromise: Promise[Status] = Promise()
|
val statusPromise: Promise[AssociationHandle] = Promise()
|
||||||
|
|
||||||
val stateActor = system.actorOf(Props(new ProtocolStateActor(
|
val stateActor = system.actorOf(Props(new ProtocolStateActor(
|
||||||
localAddress,
|
localAddress,
|
||||||
|
|
@ -449,7 +449,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
|
||||||
failureDetector)))
|
failureDetector)))
|
||||||
|
|
||||||
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
val wrappedHandle = Await.result(statusPromise.future, 3 seconds) match {
|
||||||
case Transport.Ready(h) ⇒
|
case h: AssociationHandle ⇒
|
||||||
h.remoteAddress must be === remoteAkkaAddress
|
h.remoteAddress must be === remoteAkkaAddress
|
||||||
h.localAddress must be === localAkkaAddress
|
h.localAddress must be === localAkkaAddress
|
||||||
h
|
h
|
||||||
|
|
|
||||||
|
|
@ -8,13 +8,7 @@ import akka.testkit.{ ImplicitSender, DefaultTimeout, AkkaSpec }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import scala.concurrent.{ Future, Await }
|
import scala.concurrent.{ Future, Await }
|
||||||
import akka.remote.RemoteActorRefProvider
|
import akka.remote.RemoteActorRefProvider
|
||||||
import akka.remote.transport.Transport.InboundAssociation
|
import akka.remote.transport.TestTransport.{ DisassociateAttempt, WriteAttempt, ListenAttempt, AssociateAttempt }
|
||||||
import akka.remote.transport.TestTransport.DisassociateAttempt
|
|
||||||
import akka.remote.transport.TestTransport.WriteAttempt
|
|
||||||
import akka.remote.transport.TestTransport.ListenAttempt
|
|
||||||
import akka.remote.transport.Transport.Fail
|
|
||||||
import akka.remote.transport.TestTransport.AssociateAttempt
|
|
||||||
import akka.remote.transport.Transport.Ready
|
|
||||||
|
|
||||||
abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
|
abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
|
||||||
extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """)
|
extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """)
|
||||||
|
|
@ -85,10 +79,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
|
||||||
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
|
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
|
||||||
awaitCond(registry.transportsReady(addressATest))
|
awaitCond(registry.transportsReady(addressATest))
|
||||||
|
|
||||||
Await.result(transportA.associate(nonExistingAddress), timeout.duration) match {
|
// TestTransport throws IllegalArgumentException when trying to associate with non-existing system
|
||||||
case Fail(_) ⇒
|
intercept[IllegalArgumentException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) }
|
||||||
case _ ⇒ fail()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully send PDUs" in {
|
"successfully send PDUs" in {
|
||||||
|
|
@ -101,12 +93,12 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
|
||||||
|
|
||||||
awaitCond(registry.transportsReady(addressATest, addressBTest))
|
awaitCond(registry.transportsReady(addressATest, addressBTest))
|
||||||
|
|
||||||
val associate: Future[Status] = transportA.associate(addressB)
|
val associate: Future[AssociationHandle] = transportA.associate(addressB)
|
||||||
val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
||||||
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
||||||
}
|
}
|
||||||
|
|
||||||
val Ready(handleA) = Await.result(associate, timeout.duration)
|
val handleA = Await.result(associate, timeout.duration)
|
||||||
|
|
||||||
// Initialize handles
|
// Initialize handles
|
||||||
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
||||||
|
|
@ -139,12 +131,12 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
|
||||||
|
|
||||||
awaitCond(registry.transportsReady(addressATest, addressBTest))
|
awaitCond(registry.transportsReady(addressATest, addressBTest))
|
||||||
|
|
||||||
val associate: Future[Status] = transportA.associate(addressB)
|
val associate: Future[AssociationHandle] = transportA.associate(addressB)
|
||||||
val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
||||||
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
||||||
}
|
}
|
||||||
|
|
||||||
val Ready(handleA) = Await.result(associate, timeout.duration)
|
val handleA = Await.result(associate, timeout.duration)
|
||||||
|
|
||||||
// Initialize handles
|
// Initialize handles
|
||||||
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
||||||
|
|
|
||||||
|
|
@ -55,10 +55,10 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
|
||||||
var transportA = new TestTransport(addressA, registry)
|
var transportA = new TestTransport(addressA, registry)
|
||||||
|
|
||||||
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
|
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
|
||||||
Await.result(transportA.associate(nonExistingAddress), timeout.duration) match {
|
|
||||||
case Fail(_) ⇒
|
// TestTransport throws IllegalArgumentException when trying to associate with non-existing system
|
||||||
case _ ⇒ fail()
|
intercept[IllegalArgumentException] { Await.result(transportA.associate(nonExistingAddress), timeout.duration) }
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"emulate sending PDUs and logs write" in {
|
"emulate sending PDUs and logs write" in {
|
||||||
|
|
@ -71,12 +71,12 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
|
||||||
|
|
||||||
awaitCond(registry.transportsReady(addressA, addressB))
|
awaitCond(registry.transportsReady(addressA, addressB))
|
||||||
|
|
||||||
val associate: Future[Status] = transportA.associate(addressB)
|
val associate: Future[AssociationHandle] = transportA.associate(addressB)
|
||||||
val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
val handleB = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
||||||
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
||||||
}
|
}
|
||||||
|
|
||||||
val Ready(handleA) = Await.result(associate, timeout.duration)
|
val handleA = Await.result(associate, timeout.duration)
|
||||||
|
|
||||||
// Initialize handles
|
// Initialize handles
|
||||||
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
||||||
|
|
@ -108,12 +108,12 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
|
||||||
|
|
||||||
awaitCond(registry.transportsReady(addressA, addressB))
|
awaitCond(registry.transportsReady(addressA, addressB))
|
||||||
|
|
||||||
val associate: Future[Status] = transportA.associate(addressB)
|
val associate: Future[AssociationHandle] = transportA.associate(addressB)
|
||||||
val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
val handleB: AssociationHandle = expectMsgPF(timeout.duration, "Expect InboundAssociation from A") {
|
||||||
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
case InboundAssociation(handle) if handle.remoteAddress == addressA ⇒ handle
|
||||||
}
|
}
|
||||||
|
|
||||||
val Ready(handleA) = Await.result(associate, timeout.duration)
|
val handleA = Await.result(associate, timeout.duration)
|
||||||
|
|
||||||
// Initialize handles
|
// Initialize handles
|
||||||
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
|
||||||
|
|
|
||||||
|
|
@ -5,11 +5,7 @@ import akka.actor._
|
||||||
import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
|
import akka.testkit.{ TimingTest, DefaultTimeout, ImplicitSender, AkkaSpec }
|
||||||
import ThrottlerTransportAdapterSpec._
|
import ThrottlerTransportAdapterSpec._
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.remote.transport.TestTransport.{ WriteAttempt, AssociationRegistry }
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.{ Promise, Future, Await }
|
|
||||||
import akka.remote.transport.Transport.{ Ready, InboundAssociation, Status }
|
|
||||||
import akka.util.ByteString
|
|
||||||
import akka.remote.transport.AssociationHandle.InboundPayload
|
|
||||||
import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle }
|
import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, TokenBucket, SetThrottle }
|
||||||
import akka.remote.RemoteActorRefProvider
|
import akka.remote.RemoteActorRefProvider
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue