Removed implicit conversions from ActorRef to AssociationEventListener and HandleEventListener

This commit is contained in:
Endre Sándor Varga 2012-12-18 12:54:17 +01:00
parent 752e43a0e7
commit 4c7691fe40
13 changed files with 72 additions and 69 deletions

View file

@ -248,7 +248,7 @@ private[remote] class EndpointWriter(
reader = Some(
context.watch(context.actorOf(Props(new EndpointReader(readerCodec, readerLocalAddress, readerDispatcher)),
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next())))
h.readHandlerPromise.success(reader.get)
h.readHandlerPromise.success(ActorHandleEventListener(reader.get))
case None throw new EndpointException("Internal error: No handle was present during creation of the endpoint" +
"reader.", null)
}

View file

@ -26,13 +26,13 @@ object RemoteActorRefProvider {
private class RemotingTerminator extends Actor with FSM[TerminatorState, Option[Internals]] {
import context.dispatcher
val systemGuardian = context.system.asInstanceOf[ExtendedActorSystem].provider.systemGuardian
val systemGuardian = context.actorFor("/system")
startWith(Uninitialized, None)
when(Uninitialized) {
case Event(i: Internals, _)
systemGuardian.tell(RegisterTerminationHook, self)
systemGuardian ! RegisterTerminationHook
goto(Idle) using Some(i)
}
@ -46,8 +46,7 @@ object RemoteActorRefProvider {
// TODO: state timeout
when(WaitDaemonShutdown) {
case Event(TerminationHookDone, Some(internals))
log.info("Remote daemon shut down.")
log.info("Shutting down remoting.")
log.info("Remote daemon shut down; proceeding with flushing remote transports.")
internals.transport.shutdown() pipeTo self
goto(WaitTransportShutdown)
}
@ -55,7 +54,7 @@ object RemoteActorRefProvider {
when(WaitTransportShutdown) {
case Event((), _)
log.info("Remoting shut down.")
systemGuardian.tell(TerminationHookDone, self)
systemGuardian ! TerminationHookDone
stop()
}
@ -141,8 +140,8 @@ class RemoteActorRefProvider(
}).get
})
remotingTerminator ! internals
_internals = internals
remotingTerminator ! internals
_log = Logging(eventStream, "RemoteActorRefProvider")

View file

@ -8,8 +8,7 @@ import akka.japi.Util.immutableSeq
import akka.pattern.{ gracefulStop, pipe, ask }
import akka.remote.EndpointManager._
import akka.remote.Remoting.TransportSupervisor
import akka.remote.transport.Transport.AssociationEventListener
import akka.remote.transport.Transport.InboundAssociation
import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation }
import akka.remote.transport._
import akka.util.Timeout
import com.typesafe.config.Config
@ -386,7 +385,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
// Register to each transport as listener and collect mapping to addresses
val transportsAndAddresses = results map {
case (transport, address, promise)
promise.success(self)
promise.success(ActorAssociationEventListener(self))
transport -> address
}
addressesPromise.success(transportsAndAddresses)

View file

@ -54,7 +54,7 @@ trait SchemeAugmenter {
/**
* An adapter that wraps a transport and provides interception
*/
abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport, implicit val ec: ExecutionContext)
abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport)(implicit val ec: ExecutionContext)
extends Transport with SchemeAugmenter {
protected def maximumOverhead: Int
@ -125,7 +125,7 @@ object ActorTransportAdapter {
}
abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
extends AbstractTransportAdapter(wrappedTransport, system.dispatcher) {
extends AbstractTransportAdapter(wrappedTransport)(system.dispatcher) {
import ActorTransportAdapter._
private implicit val timeout = new Timeout(3 seconds)
@ -140,9 +140,12 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS
override def interceptListen(listenAddress: Address,
listenerPromise: Future[AssociationEventListener]): Future[AssociationEventListener] = {
registerManager().map { mgr
// Side effecting: storing the manager instance in volatile var
// This is done only once: during the initialization of the protocol stack. The variable manager is not read
// before listen is called.
manager = mgr
manager ! ListenUnderlying(listenAddress, listenerPromise)
manager
ActorAssociationEventListener(manager)
}
}

View file

@ -280,7 +280,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
startWith(Closed, d)
case d: InboundUnassociated
d.wrappedHandle.readHandlerPromise.success(self)
d.wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self))
startWith(WaitActivity, d)
}
@ -296,7 +296,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
stop()
case Event(Ready(wrappedHandle), OutboundUnassociated(_, statusPromise, _))
wrappedHandle.readHandlerPromise.success(self)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self))
sendAssociate(wrappedHandle)
failureDetector.heartbeat()
initTimers()

View file

@ -33,7 +33,7 @@ private[remote] object FailureInjectorTransportAdapter {
}
private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem)
extends AbstractTransportAdapter(wrappedTransport, extendedSystem.dispatcher) with AssociationEventListener {
extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) with AssociationEventListener {
import extendedSystem.dispatcher
@ -62,6 +62,9 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
listenerFuture: Future[AssociationEventListener]): Future[AssociationEventListener] = {
log.warning("FailureInjectorTransport is active on this system. Gremlins might munch your packets.")
listenerFuture.onSuccess {
// Side effecting: As this class is not an actor, the only way to safely modify state is through volatile vars.
// Listen is called only during the initialization of the stack, and upstreamListener is not read before this
// finishes.
case listener: AssociationEventListener upstreamListener = Some(listener)
}
Future.successful(this)
@ -126,9 +129,9 @@ private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHand
wrappedHandle.readHandlerPromise.success(this)
}
override def write(payload: ByteString): Boolean = if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress))
wrappedHandle.write(payload)
else true
override def write(payload: ByteString): Boolean =
if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress)) wrappedHandle.write(payload)
else true
override def disassociate(): Unit = wrappedHandle.disassociate()

View file

@ -7,7 +7,7 @@ import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenerRegistered
import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload, HandleEventListener }
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener }
import akka.remote.transport.ThrottledAssociation._
import akka.remote.transport.ThrottlerManager.Checkin
import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle
@ -255,14 +255,14 @@ private[transport] class ThrottledAssociation(
override def postStop(): Unit = originalHandle.disassociate()
if (inbound) startWith(WaitExposedHandle, Uninitialized) else {
originalHandle.readHandlerPromise.success(self)
originalHandle.readHandlerPromise.success(ActorHandleEventListener(self))
startWith(WaitModeAndUpstreamListener, Uninitialized)
}
when(WaitExposedHandle) {
case Event(handle: ThrottlerHandle, Uninitialized)
// register to downstream layer and wait for origin
originalHandle.readHandlerPromise.success(self)
originalHandle.readHandlerPromise.success(ActorHandleEventListener(self))
goto(WaitOrigin) using ExposedHandle(handle)
}

View file

@ -68,8 +68,6 @@ object Transport {
override def notify(ev: AssociationEvent): Unit = actor ! ev
}
implicit def actorRef2HandleEventListener(actor: ActorRef): AssociationEventListener =
ActorAssociationEventListener(actor)
}
/**
@ -199,8 +197,6 @@ object AssociationHandle {
case class ActorHandleEventListener(actor: ActorRef) extends HandleEventListener {
override def notify(ev: HandleEvent): Unit = actor ! ev
}
implicit def actorRef2HandleEventListener(actor: ActorRef): HandleEventListener = ActorHandleEventListener(actor)
}
/**

View file

@ -19,7 +19,7 @@ class RemoteConfigSpec extends AkkaSpec(
}
""") {
// These tests are ignored as it tests configuration specific to the old remoting.
// FIXME: These tests are ignored as it tests configuration specific to the old remoting.
"Remoting" must {
"be able to parse generic remote config elements" ignore {

View file

@ -3,7 +3,7 @@ package akka.remote.transport
import akka.actor.{ ExtendedActorSystem, Address, Props }
import akka.remote.transport.AkkaPduCodec.{ Disassociate, Associate, Heartbeat }
import akka.remote.transport.AkkaProtocolSpec.TestFailureDetector
import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload }
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.TestTransport._
import akka.remote.transport.Transport._
import akka.remote.{ RemoteProtocol, RemoteActorRefProvider, FailureDetector }
@ -123,7 +123,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
system.actorOf(Props(new ProtocolStateActor(
localAddress,
handle,
testActor,
ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(conf),
codec,
failureDetector)))
@ -137,7 +137,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress,
handle,
testActor,
ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(conf),
codec,
failureDetector)))
@ -150,7 +150,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
case InboundAssociation(h) h
}
wrappedHandle.readHandlerPromise.success(testActor)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
failureDetector.called must be(true)
@ -170,7 +170,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress,
handle,
testActor,
ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(conf),
codec,
failureDetector)))
@ -257,7 +257,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress,
handle,
testActor,
ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)),
codec,
failureDetector)))
@ -276,7 +276,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val reader = system.actorOf(Props(new ProtocolStateActor(
localAddress,
handle,
testActor,
ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(ConfigFactory.parseString("akka.remoting.require-cookie = on").withFallback(conf)),
codec,
failureDetector)))
@ -288,7 +288,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
case InboundAssociation(h) h
}
wrappedHandle.readHandlerPromise.success(testActor)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
failureDetector.called must be(true)
@ -350,7 +350,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
case _ fail()
}
wrappedHandle.readHandlerPromise.success(testActor)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
lastActivityIsAssociate(registry, None) must be(true)
@ -388,7 +388,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
case _ fail()
}
wrappedHandle.readHandlerPromise.success(testActor)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
Thread.sleep(100) //FIXME: Remove this
@ -421,7 +421,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
case _ fail()
}
wrappedHandle.readHandlerPromise.success(testActor)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
lastActivityIsAssociate(registry, None) must be(true)
@ -459,7 +459,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
stateActor ! Disassociated
wrappedHandle.readHandlerPromise.success(testActor)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
expectMsg(Disassociated)

View file

@ -80,7 +80,7 @@ class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with
expectMsgPF(30 seconds) {
case (received: Int, lost: Int)
log.warning(s" ######## Received ${received - lost} messages from ${received} ########")
log.debug(s" ######## Received ${received - lost} messages from ${received} ########")
}
}
}

View file

@ -1,17 +1,20 @@
package akka.remote.transport
import akka.actor.{ ExtendedActorSystem, Address }
import akka.remote.transport.AssociationHandle.Disassociated
import akka.remote.transport.AssociationHandle.InboundPayload
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.TestTransport._
import akka.remote.transport.Transport.Fail
import akka.remote.transport.Transport.InboundAssociation
import akka.remote.transport.Transport.Ready
import akka.remote.transport.Transport.Status
import akka.remote.transport.Transport._
import akka.testkit.{ ImplicitSender, DefaultTimeout, AkkaSpec }
import akka.util.ByteString
import scala.concurrent.{ Future, Await }
import akka.remote.RemoteActorRefProvider
import akka.remote.transport.Transport.InboundAssociation
import akka.remote.transport.TestTransport.DisassociateAttempt
import akka.remote.transport.TestTransport.WriteAttempt
import akka.remote.transport.TestTransport.ListenAttempt
import akka.remote.transport.Transport.Fail
import akka.remote.transport.TestTransport.AssociateAttempt
import akka.remote.transport.Transport.Ready
abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
extends AkkaSpec("""akka.actor.provider = "akka.remote.RemoteActorRefProvider" """)
@ -61,8 +64,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
val transportB = newTransportB(registry)
// Must complete the returned promise to receive events
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressATest, addressBTest))
@ -79,7 +82,7 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
val registry = new AssociationRegistry
val transportA = newTransportA(registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressATest))
Await.result(transportA.associate(nonExistingAddress), timeout.duration) match {
@ -93,8 +96,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
val transportA = newTransportA(registry)
val transportB = newTransportB(registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressATest, addressBTest))
@ -106,8 +109,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
val Ready(handleA) = Await.result(associate, timeout.duration)
// Initialize handles
handleA.readHandlerPromise.success(self)
handleB.readHandlerPromise.success(self)
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
handleB.readHandlerPromise.success(ActorHandleEventListener(self))
val payload = ByteString("PDU")
val pdu = if (withAkkaProtocol) AkkaPduProtobufCodec.constructPayload(payload) else payload
@ -131,8 +134,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
val transportA = newTransportA(registry)
val transportB = newTransportB(registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressATest, addressBTest))
@ -144,8 +147,8 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
val Ready(handleA) = Await.result(associate, timeout.duration)
// Initialize handles
handleA.readHandlerPromise.success(self)
handleB.readHandlerPromise.success(self)
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
handleB.readHandlerPromise.success(ActorHandleEventListener(self))
awaitCond(registry.existsAssociation(addressATest, addressBTest))

View file

@ -6,7 +6,7 @@ import akka.actor.Address
import akka.remote.transport.Transport._
import akka.remote.transport.TestTransport._
import akka.util.ByteString
import akka.remote.transport.AssociationHandle.{ Disassociated, InboundPayload }
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload }
class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
@ -37,8 +37,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
val transportB = new TestTransport(addressB, registry)
// Must complete the returned promise to receive events
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressA, addressB))
@ -54,7 +54,7 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
val registry = new AssociationRegistry
var transportA = new TestTransport(addressA, registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
Await.result(transportA.associate(nonExistingAddress), timeout.duration) match {
case Fail(_)
case _ fail()
@ -66,8 +66,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
val transportA = new TestTransport(addressA, registry)
val transportB = new TestTransport(addressB, registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressA, addressB))
@ -79,8 +79,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
val Ready(handleA) = Await.result(associate, timeout.duration)
// Initialize handles
handleA.readHandlerPromise.success(self)
handleB.readHandlerPromise.success(self)
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
handleB.readHandlerPromise.success(ActorHandleEventListener(self))
val akkaPDU = ByteString("AkkaPDU")
@ -103,8 +103,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
val transportA = new TestTransport(addressA, registry)
val transportB = new TestTransport(addressB, registry)
Await.result(transportA.listen, timeout.duration)._2.success(self)
Await.result(transportB.listen, timeout.duration)._2.success(self)
Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
Await.result(transportB.listen, timeout.duration)._2.success(ActorAssociationEventListener(self))
awaitCond(registry.transportsReady(addressA, addressB))
@ -116,8 +116,8 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
val Ready(handleA) = Await.result(associate, timeout.duration)
// Initialize handles
handleA.readHandlerPromise.success(self)
handleB.readHandlerPromise.success(self)
handleA.readHandlerPromise.success(ActorHandleEventListener(self))
handleB.readHandlerPromise.success(ActorHandleEventListener(self))
awaitCond(registry.existsAssociation(addressA, addressB))