- Removed usage of null from Endpoint
- Fixed race resolution logic in FailureDetectorRegistry to not create unnecessary instances
This commit is contained in:
parent
9f006789fc
commit
d46a4b6916
11 changed files with 52 additions and 33 deletions
|
|
@ -93,8 +93,9 @@ private[remote] object EndpointWriter {
|
|||
/**
|
||||
* This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
|
||||
* to be overridden by a new inbound association. This is needed to avoid parallel inbound associations from the
|
||||
* same remote endpoint.
|
||||
* @param handle
|
||||
* same remote endpoint: when a parallel inbound association is detected, the old one is removed and the new one is
|
||||
* used instead.
|
||||
* @param handle Handle of the new inbound association.
|
||||
*/
|
||||
case class TakeOver(handle: AssociationHandle)
|
||||
case object BackoffTimer
|
||||
|
|
@ -130,7 +131,7 @@ private[remote] class EndpointWriter(
|
|||
override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ publishAndThrow(e) }
|
||||
|
||||
val msgDispatch =
|
||||
new DefaultMessageDispatcher(extendedSystem, extendedSystem.provider.asInstanceOf[RemoteActorRefProvider], log)
|
||||
new DefaultMessageDispatcher(extendedSystem, RARP(extendedSystem).provider, log)
|
||||
|
||||
def inbound = handle.isDefined
|
||||
|
||||
|
|
@ -238,8 +239,12 @@ private[remote] class EndpointWriter(
|
|||
|
||||
private def startReadEndpoint(): Unit = handle match {
|
||||
case Some(h) ⇒
|
||||
reader = Some(context.watch(context.actorOf(Props(new EndpointReader(codec, h.localAddress, msgDispatch)),
|
||||
"endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + readerId.next())))
|
||||
val readerLocalAddress = h.localAddress
|
||||
val readerCodec = codec
|
||||
val readerDispatcher = msgDispatch
|
||||
reader = Some(
|
||||
context.watch(context.actorOf(Props(new EndpointReader(readerCodec, readerLocalAddress, readerDispatcher)),
|
||||
"endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + readerId.next())))
|
||||
h.readHandlerPromise.success(reader.get)
|
||||
case None ⇒ throw new EndpointException("Internal error: No handle was present during creation of the endpoint" +
|
||||
"reader.", null)
|
||||
|
|
@ -261,7 +266,7 @@ private[remote] class EndpointReader(
|
|||
val localAddress: Address,
|
||||
val msgDispatch: InboundMessageDispatcher) extends Actor {
|
||||
|
||||
val provider = context.system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider]
|
||||
val provider = RARP(context.system).provider
|
||||
|
||||
override def receive: Receive = {
|
||||
case Disassociated ⇒ context.stop(self)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue