From 4c7691fe40c8a7ddb7dc214581bf0c41b039f2ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 18 Dec 2012 12:54:17 +0100 Subject: [PATCH] Removed implicit conversions from ActorRef to AssociationEventListener and HandleEventListener --- .../src/main/scala/akka/remote/Endpoint.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 11 +++--- .../src/main/scala/akka/remote/Remoting.scala | 5 +-- .../transport/AbstractTransportAdapter.scala | 9 +++-- .../transport/AkkaProtocolTransport.scala | 4 +- .../FailureInjectorTransportAdapter.scala | 11 ++++-- .../transport/ThrottlerTransportAdapter.scala | 6 +-- .../akka/remote/transport/Transport.scala | 4 -- .../scala/akka/remote/RemoteConfigSpec.scala | 2 +- .../remote/transport/AkkaProtocolSpec.scala | 24 ++++++------ .../transport/AkkaProtocolStressTest.scala | 2 +- .../transport/GenericTransportSpec.scala | 37 ++++++++++--------- .../remote/transport/TestTransportSpec.scala | 24 ++++++------ 13 files changed, 72 insertions(+), 69 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 3bd3514e01..b6764f3465 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -248,7 +248,7 @@ private[remote] class EndpointWriter( reader = Some( context.watch(context.actorOf(Props(new EndpointReader(readerCodec, readerLocalAddress, readerDispatcher)), "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))) - h.readHandlerPromise.success(reader.get) + h.readHandlerPromise.success(ActorHandleEventListener(reader.get)) case None ⇒ throw new EndpointException("Internal error: No handle was present during creation of the endpoint" + "reader.", null) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 8467a96d73..02d8b5d837 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -26,13 +26,13 @@ object RemoteActorRefProvider { private class RemotingTerminator extends Actor with FSM[TerminatorState, Option[Internals]] { import context.dispatcher - val systemGuardian = context.system.asInstanceOf[ExtendedActorSystem].provider.systemGuardian + val systemGuardian = context.actorFor("/system") startWith(Uninitialized, None) when(Uninitialized) { case Event(i: Internals, _) ⇒ - systemGuardian.tell(RegisterTerminationHook, self) + systemGuardian ! RegisterTerminationHook goto(Idle) using Some(i) } @@ -46,8 +46,7 @@ object RemoteActorRefProvider { // TODO: state timeout when(WaitDaemonShutdown) { case Event(TerminationHookDone, Some(internals)) ⇒ - log.info("Remote daemon shut down.") - log.info("Shutting down remoting.") + log.info("Remote daemon shut down; proceeding with flushing remote transports.") internals.transport.shutdown() pipeTo self goto(WaitTransportShutdown) } @@ -55,7 +54,7 @@ object RemoteActorRefProvider { when(WaitTransportShutdown) { case Event((), _) ⇒ log.info("Remoting shut down.") - systemGuardian.tell(TerminationHookDone, self) + systemGuardian ! TerminationHookDone stop() } @@ -141,8 +140,8 @@ class RemoteActorRefProvider( }).get }) - remotingTerminator ! internals _internals = internals + remotingTerminator ! internals _log = Logging(eventStream, "RemoteActorRefProvider") diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1ad24f3622..40f3ff0320 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -8,8 +8,7 @@ import akka.japi.Util.immutableSeq import akka.pattern.{ gracefulStop, pipe, ask } import akka.remote.EndpointManager._ import akka.remote.Remoting.TransportSupervisor -import akka.remote.transport.Transport.AssociationEventListener -import akka.remote.transport.Transport.InboundAssociation +import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation } import akka.remote.transport._ import akka.util.Timeout import com.typesafe.config.Config @@ -386,7 +385,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends // Register to each transport as listener and collect mapping to addresses val transportsAndAddresses = results map { case (transport, address, promise) ⇒ - promise.success(self) + promise.success(ActorAssociationEventListener(self)) transport -> address } addressesPromise.success(transportsAndAddresses) diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 81fc6702be..f353d2889d 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -54,7 +54,7 @@ trait SchemeAugmenter { /** * An adapter that wraps a transport and provides interception */ -abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport, implicit val ec: ExecutionContext) +abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport)(implicit val ec: ExecutionContext) extends Transport with SchemeAugmenter { protected def maximumOverhead: Int @@ -125,7 +125,7 @@ object ActorTransportAdapter { } abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem) - extends AbstractTransportAdapter(wrappedTransport, system.dispatcher) { + extends AbstractTransportAdapter(wrappedTransport)(system.dispatcher) { import ActorTransportAdapter._ private implicit val timeout = new Timeout(3 seconds) @@ -140,9 +140,12 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS override def interceptListen(listenAddress: Address, listenerPromise: Future[AssociationEventListener]): Future[AssociationEventListener] = { registerManager().map { mgr ⇒ + // Side effecting: storing the manager instance in volatile var + // This is done only once: during the initialization of the protocol stack. The variable manager is not read + // before listen is called. manager = mgr manager ! ListenUnderlying(listenAddress, listenerPromise) - manager + ActorAssociationEventListener(manager) } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index d6a84cbdf3..78cb7ceaff 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -280,7 +280,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat startWith(Closed, d) case d: InboundUnassociated ⇒ - d.wrappedHandle.readHandlerPromise.success(self) + d.wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self)) startWith(WaitActivity, d) } @@ -296,7 +296,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat stop() case Event(Ready(wrappedHandle), OutboundUnassociated(_, statusPromise, _)) ⇒ - wrappedHandle.readHandlerPromise.success(self) + wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self)) sendAssociate(wrappedHandle) failureDetector.heartbeat() initTimers() diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index c9c98dfe05..0ad253e186 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -33,7 +33,7 @@ private[remote] object FailureInjectorTransportAdapter { } private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem) - extends AbstractTransportAdapter(wrappedTransport, extendedSystem.dispatcher) with AssociationEventListener { + extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) with AssociationEventListener { import extendedSystem.dispatcher @@ -62,6 +62,9 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor listenerFuture: Future[AssociationEventListener]): Future[AssociationEventListener] = { log.warning("FailureInjectorTransport is active on this system. Gremlins might munch your packets.") listenerFuture.onSuccess { + // Side effecting: As this class is not an actor, the only way to safely modify state is through volatile vars. + // Listen is called only during the initialization of the stack, and upstreamListener is not read before this + // finishes. case listener: AssociationEventListener ⇒ upstreamListener = Some(listener) } Future.successful(this) @@ -126,9 +129,9 @@ private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHand wrappedHandle.readHandlerPromise.success(this) } - override def write(payload: ByteString): Boolean = if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress)) - wrappedHandle.write(payload) - else true + override def write(payload: ByteString): Boolean = + if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress)) wrappedHandle.write(payload) + else true override def disassociate(): Unit = wrappedHandle.disassociate() diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 77c311fe60..ee36be8581 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -7,7 +7,7 @@ import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.ActorTransportAdapter.ListenUnderlying import akka.remote.transport.ActorTransportAdapter.ListenerRegistered import akka.remote.transport.AkkaPduCodec.Associate -import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload, HandleEventListener } +import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } import akka.remote.transport.ThrottledAssociation._ import akka.remote.transport.ThrottlerManager.Checkin import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle @@ -255,14 +255,14 @@ private[transport] class ThrottledAssociation( override def postStop(): Unit = originalHandle.disassociate() if (inbound) startWith(WaitExposedHandle, Uninitialized) else { - originalHandle.readHandlerPromise.success(self) + originalHandle.readHandlerPromise.success(ActorHandleEventListener(self)) startWith(WaitModeAndUpstreamListener, Uninitialized) } when(WaitExposedHandle) { case Event(handle: ThrottlerHandle, Uninitialized) ⇒ // register to downstream layer and wait for origin - originalHandle.readHandlerPromise.success(self) + originalHandle.readHandlerPromise.success(ActorHandleEventListener(self)) goto(WaitOrigin) using ExposedHandle(handle) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 4cdc2f47db..6e3be7aa5d 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -68,8 +68,6 @@ object Transport { override def notify(ev: AssociationEvent): Unit = actor ! ev } - implicit def actorRef2HandleEventListener(actor: ActorRef): AssociationEventListener = - ActorAssociationEventListener(actor) } /** @@ -199,8 +197,6 @@ object AssociationHandle { case class ActorHandleEventListener(actor: ActorRef) extends HandleEventListener { override def notify(ev: HandleEvent): Unit = actor ! ev } - - implicit def actorRef2HandleEventListener(actor: ActorRef): HandleEventListener = ActorHandleEventListener(actor) } /** diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 06a0882528..f1d9ac2397 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -19,7 +19,7 @@ class RemoteConfigSpec extends AkkaSpec( } """) { - // These tests are ignored as it tests configuration specific to the old remoting. + // FIXME: These tests are ignored as it tests configuration specific to the old remoting. "Remoting" must { "be able to parse generic remote config elements" ignore { diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 133be9d546..ce630c4ac7 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -3,7 +3,7 @@ package akka.remote.transport import akka.actor.{ ExtendedActorSystem, Address, Props } import akka.remote.transport.AkkaPduCodec.{ Disassociate, Associate, Heartbeat } import akka.remote.transport.AkkaProtocolSpec.TestFailureDetector -import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload } +import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload } import akka.remote.transport.TestTransport._ import akka.remote.transport.Transport._ import akka.remote.{ RemoteProtocol, RemoteActorRefProvider, FailureDetector } @@ -123,7 +123,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - testActor, + ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, failureDetector))) @@ -137,7 +137,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - testActor, + ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, failureDetector))) @@ -150,7 +150,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case InboundAssociation(h) ⇒ h } - wrappedHandle.readHandlerPromise.success(testActor) + wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) failureDetector.called must be(true) @@ -170,7 +170,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - testActor, + ActorAssociationEventListener(testActor), new AkkaProtocolSettings(conf), codec, failureDetector))) @@ -257,7 +257,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - testActor, + ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)), codec, failureDetector))) @@ -276,7 +276,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val reader = system.actorOf(Props(new ProtocolStateActor( localAddress, handle, - testActor, + ActorAssociationEventListener(testActor), new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)), codec, failureDetector))) @@ -288,7 +288,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case InboundAssociation(h) ⇒ h } - wrappedHandle.readHandlerPromise.success(testActor) + wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) failureDetector.called must be(true) @@ -350,7 +350,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case _ ⇒ fail() } - wrappedHandle.readHandlerPromise.success(testActor) + wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) lastActivityIsAssociate(registry, None) must be(true) @@ -388,7 +388,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case _ ⇒ fail() } - wrappedHandle.readHandlerPromise.success(testActor) + wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) Thread.sleep(100) //FIXME: Remove this @@ -421,7 +421,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case _ ⇒ fail() } - wrappedHandle.readHandlerPromise.success(testActor) + wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) lastActivityIsAssociate(registry, None) must be(true) @@ -459,7 +459,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re stateActor ! Disassociated - wrappedHandle.readHandlerPromise.success(testActor) + wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) expectMsg(Disassociated) diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index acd9939621..da9bbc9ac8 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -80,7 +80,7 @@ class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with expectMsgPF(30 seconds) { case (received: Int, lost: Int) ⇒ - log.warning(s" ######## Received ${received - lost} messages from ${received} ########") + log.debug(s" ######## Received ${received - lost} messages from ${received} ########") } } } diff --git a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala index 6c22241a0d..7974584f2d 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala @@ -1,17 +1,20 @@ package akka.remote.transport import akka.actor.{ ExtendedActorSystem, Address } -import akka.remote.transport.AssociationHandle.Disassociated -import akka.remote.transport.AssociationHandle.InboundPayload +import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload } import akka.remote.transport.TestTransport._ -import akka.remote.transport.Transport.Fail -import akka.remote.transport.Transport.InboundAssociation -import akka.remote.transport.Transport.Ready -import akka.remote.transport.Transport.Status +import akka.remote.transport.Transport._ import akka.testkit.{ ImplicitSender, DefaultTimeout, AkkaSpec } import akka.util.ByteString import scala.concurrent.{ Future, Await } import akka.remote.RemoteActorRefProvider +import akka.remote.transport.Transport.InboundAssociation +import akka.remote.transport.TestTransport.DisassociateAttempt +import akka.remote.transport.TestTransport.WriteAttempt +import akka.remote.transport.TestTransport.ListenAttempt +import akka.remote.transport.Transport.Fail +import akka.remote.transport.TestTransport.AssociateAttempt +import akka.remote.transport.Transport.Ready abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """) @@ -61,8 +64,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) val transportB = newTransportB(registry) // Must complete the returned promise to receive events - Await.result(transportA.listen, timeout.duration)._2.success(self) - Await.result(transportB.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) + Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressATest, addressBTest)) @@ -79,7 +82,7 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) val registry = new AssociationRegistry val transportA = newTransportA(registry) - Await.result(transportA.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressATest)) Await.result(transportA.associate(nonExistingAddress), timeout.duration) match { @@ -93,8 +96,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) val transportA = newTransportA(registry) val transportB = newTransportB(registry) - Await.result(transportA.listen, timeout.duration)._2.success(self) - Await.result(transportB.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) + Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressATest, addressBTest)) @@ -106,8 +109,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) val Ready(handleA) = Await.result(associate, timeout.duration) // Initialize handles - handleA.readHandlerPromise.success(self) - handleB.readHandlerPromise.success(self) + handleA.readHandlerPromise.success(ActorHandleEventListener(self)) + handleB.readHandlerPromise.success(ActorHandleEventListener(self)) val payload = ByteString("PDU") val pdu = if (withAkkaProtocol) AkkaPduProtobufCodec.constructPayload(payload) else payload @@ -131,8 +134,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) val transportA = newTransportA(registry) val transportB = newTransportB(registry) - Await.result(transportA.listen, timeout.duration)._2.success(self) - Await.result(transportB.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) + Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressATest, addressBTest)) @@ -144,8 +147,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) val Ready(handleA) = Await.result(associate, timeout.duration) // Initialize handles - handleA.readHandlerPromise.success(self) - handleB.readHandlerPromise.success(self) + handleA.readHandlerPromise.success(ActorHandleEventListener(self)) + handleB.readHandlerPromise.success(ActorHandleEventListener(self)) awaitCond(registry.existsAssociation(addressATest, addressBTest)) diff --git a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala index 2c8eff1122..4c4381e023 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala @@ -6,7 +6,7 @@ import akka.actor.Address import akka.remote.transport.Transport._ import akka.remote.transport.TestTransport._ import akka.util.ByteString -import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload } +import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload } class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { @@ -37,8 +37,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender val transportB = new TestTransport(addressB, registry) // Must complete the returned promise to receive events - Await.result(transportA.listen, timeout.duration)._2.success(self) - Await.result(transportB.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) + Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressA, addressB)) @@ -54,7 +54,7 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender val registry = new AssociationRegistry var transportA = new TestTransport(addressA, registry) - Await.result(transportA.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) Await.result(transportA.associate(nonExistingAddress), timeout.duration) match { case Fail(_) ⇒ case _ ⇒ fail() @@ -66,8 +66,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender val transportA = new TestTransport(addressA, registry) val transportB = new TestTransport(addressB, registry) - Await.result(transportA.listen, timeout.duration)._2.success(self) - Await.result(transportB.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) + Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressA, addressB)) @@ -79,8 +79,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender val Ready(handleA) = Await.result(associate, timeout.duration) // Initialize handles - handleA.readHandlerPromise.success(self) - handleB.readHandlerPromise.success(self) + handleA.readHandlerPromise.success(ActorHandleEventListener(self)) + handleB.readHandlerPromise.success(ActorHandleEventListener(self)) val akkaPDU = ByteString("AkkaPDU") @@ -103,8 +103,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender val transportA = new TestTransport(addressA, registry) val transportB = new TestTransport(addressB, registry) - Await.result(transportA.listen, timeout.duration)._2.success(self) - Await.result(transportB.listen, timeout.duration)._2.success(self) + Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) + Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) awaitCond(registry.transportsReady(addressA, addressB)) @@ -116,8 +116,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender val Ready(handleA) = Await.result(associate, timeout.duration) // Initialize handles - handleA.readHandlerPromise.success(self) - handleB.readHandlerPromise.success(self) + handleA.readHandlerPromise.success(ActorHandleEventListener(self)) + handleB.readHandlerPromise.success(ActorHandleEventListener(self)) awaitCond(registry.existsAssociation(addressA, addressB))