Minor restructuring of the EndpointWriter

This commit is contained in:
Viktor Klang 2012-12-21 18:11:56 +01:00
parent 40204de3e7
commit bf2593300b

View file

@ -134,8 +134,7 @@ private[remote] class EndpointWriter(
override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) publishAndThrow(e) } override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) publishAndThrow(e) }
val msgDispatch = val msgDispatch = new DefaultMessageDispatcher(extendedSystem, RARP(extendedSystem).provider, log)
new DefaultMessageDispatcher(extendedSystem, RARP(extendedSystem).provider, log)
def inbound = handle.isDefined def inbound = handle.isDefined
@ -151,15 +150,17 @@ private[remote] class EndpointWriter(
preStart() preStart()
} }
override def preStart(): Unit = { override def preStart(): Unit =
if (!inbound) { startWith(
transport.associate(remoteAddress) pipeTo self handle match {
startWith(Initializing, ()) case Some(h)
} else { reader = startReadEndpoint(h)
startReadEndpoint() Writing
startWith(Writing, ()) case None
} transport.associate(remoteAddress) pipeTo self
} Initializing
},
())
when(Initializing) { when(Initializing) {
case Event(Send(msg, senderOption, recipient), _) case Event(Send(msg, senderOption, recipient), _)
@ -172,8 +173,9 @@ private[remote] class EndpointWriter(
case Event(Status.Failure(e), _) case Event(Status.Failure(e), _)
publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e)) publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e))
case Event(inboundHandle: AssociationHandle, _) case Event(inboundHandle: AssociationHandle, _)
// Assert handle == None?
handle = Some(inboundHandle) handle = Some(inboundHandle)
startReadEndpoint() reader = startReadEndpoint(inboundHandle)
goto(Writing) goto(Writing)
} }
@ -188,19 +190,20 @@ private[remote] class EndpointWriter(
when(Writing) { when(Writing) {
case Event(Send(msg, senderOption, recipient), _) case Event(Send(msg, senderOption, recipient), _)
val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption) try {
val success = try {
handle match { handle match {
case Some(h) h.write(pdu) case Some(h)
case None throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" + val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption)
"handle is present.") if (h.write(pdu)) stay() else {
stash()
goto(Buffering)
}
case None
throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.")
} }
} catch { } catch {
case NonFatal(e) publishAndThrow(new EndpointException("Failed to write message to the transport", e)) case NonFatal(e: EndpointException) publishAndThrow(e)
} case NonFatal(e) publishAndThrow(new EndpointException("Failed to write message to the transport", e))
if (success) stay() else {
stash()
goto(Buffering)
} }
} }
@ -209,14 +212,9 @@ private[remote] class EndpointWriter(
case Event(TakeOver(newHandle), _) case Event(TakeOver(newHandle), _)
// Shutdown old reader // Shutdown old reader
handle foreach { _.disassociate() } handle foreach { _.disassociate() }
reader match { reader foreach { r context stop context.unwatch(r) }
case Some(r)
context.unwatch(r)
context.stop(r)
case None
}
handle = Some(newHandle) handle = Some(newHandle)
startReadEndpoint() reader = startReadEndpoint(newHandle)
unstashAll() unstashAll()
goto(Writing) goto(Writing)
} }
@ -242,17 +240,15 @@ private[remote] class EndpointWriter(
eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound)) eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound))
} }
private def startReadEndpoint(): Unit = handle match { private def startReadEndpoint(handle: AssociationHandle): Some[ActorRef] = {
case Some(h) val readerLocalAddress = handle.localAddress
val readerLocalAddress = h.localAddress val readerCodec = codec
val readerCodec = codec val readerDispatcher = msgDispatch
val readerDispatcher = msgDispatch val newReader =
reader = Some( context.watch(context.actorOf(Props(new EndpointReader(readerCodec, readerLocalAddress, readerDispatcher)),
context.watch(context.actorOf(Props(new EndpointReader(readerCodec, readerLocalAddress, readerDispatcher)), "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))) handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
h.readHandlerPromise.success(ActorHandleEventListener(reader.get)) Some(newReader)
case None throw new EndpointException("Internal error: No handle was present during creation of the endpoint" +
"reader.")
} }
private def serializeMessage(msg: Any): MessageProtocol = handle match { private def serializeMessage(msg: Any): MessageProtocol = handle match {