diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index b6764f3465..ed9b220180 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -106,7 +106,10 @@ private[remote] object EndpointWriter { case object Writing extends State } -private[remote] class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause) +private[remote] class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause) { + def this(msg: String) = this(msg, null) +} + private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) extends EndpointException("Invalid address: " + remoteAddress, cause) @@ -190,7 +193,7 @@ private[remote] class EndpointWriter( handle match { case Some(h) ⇒ h.write(pdu) case None ⇒ throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" + - "handle is present.", null) + "handle is present.") } } catch { case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e)) @@ -202,7 +205,7 @@ private[remote] class EndpointWriter( } whenUnhandled { - case Event(Terminated(r), _) if Some(r) == reader ⇒ publishAndThrow(new EndpointException("Disassociated", null)) + case Event(Terminated(r), _) if Some(r) == reader ⇒ publishAndThrow(new EndpointException("Disassociated")) case Event(TakeOver(newHandle), _) ⇒ // Shutdown old reader handle foreach { _.disassociate() } @@ -230,14 +233,13 @@ private[remote] class EndpointWriter( } onTermination { - case StopEvent(_, _, _) ⇒ if (handle ne null) { + case StopEvent(_, _, _) ⇒ // FIXME: Add a test case for this // It is important to call unstashAll() for the stash to work properly and maintain messages during restart. // As the FSM trait does not call super.postStop(), this call is needed unstashAll() handle foreach { _.disassociate() } eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound)) - } } private def startReadEndpoint(): Unit = handle match { @@ -250,16 +252,18 @@ private[remote] class EndpointWriter( "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))) h.readHandlerPromise.success(ActorHandleEventListener(reader.get)) case None ⇒ throw new EndpointException("Internal error: No handle was present during creation of the endpoint" + - "reader.", null) + "reader.") } private def serializeMessage(msg: Any): MessageProtocol = handle match { + // FIXME: Unserializable messages should be dropped without closing the association. Should be logged, + // but without flooding the log. case Some(h) ⇒ Serialization.currentTransportAddress.withValue(h.localAddress) { (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])) } case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of" + - "outbound message.", null) + "outbound message.") } }