Reduce unwanted logging from remoting, see #2826
* Handle logging in EndpointManager supervisorStrategy * Added some more exception types to be able to differentiate failures
This commit is contained in:
parent
f4d59383d7
commit
a67fa18f8d
4 changed files with 49 additions and 24 deletions
|
|
@ -81,6 +81,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
|
|||
|
||||
Seq(".*received dead letter from.*ClientDisconnected",
|
||||
".*received dead letter from.*deadLetters.*PoisonPill",
|
||||
".*received dead letter from.*Disassociated",
|
||||
".*received dead letter from.*DisassociateUnderlying",
|
||||
".*received dead letter from.*HandleListenerRegistered",
|
||||
".*installing context org.jboss.netty.channel.DefaultChannelPipeline.*") foreach { s ⇒
|
||||
sys.eventStream.publish(Mute(EventFilter.warning(pattern = s)))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -124,9 +124,22 @@ private[remote] class EndpointException(msg: String, cause: Throwable) extends A
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable)
|
||||
extends EndpointException("Invalid address: " + remoteAddress, cause)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[remote] class EndpointDisassociatedException(msg: String) extends EndpointException(msg)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[remote] class EndpointAssociationException(msg: String, cause: Throwable) extends EndpointException(msg, cause)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -184,11 +197,9 @@ private[remote] class EndpointWriter(
|
|||
stash()
|
||||
stay()
|
||||
case Event(Status.Failure(e: InvalidAssociationException), _) ⇒
|
||||
log.error("Tried to associate with invalid remote address [{}]. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
||||
case Event(Status.Failure(e), _) ⇒
|
||||
publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e))
|
||||
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
|
||||
case Event(inboundHandle: AssociationHandle, _) ⇒
|
||||
// Assert handle == None?
|
||||
handle = Some(inboundHandle)
|
||||
|
|
@ -246,7 +257,7 @@ private[remote] class EndpointWriter(
|
|||
}
|
||||
|
||||
whenUnhandled {
|
||||
case Event(Terminated(r), _) if Some(r) == reader ⇒ publishAndThrow(new EndpointException("Disassociated"))
|
||||
case Event(Terminated(r), _) if r == reader.orNull ⇒ publishAndThrow(new EndpointDisassociatedException("Disassociated"))
|
||||
case Event(TakeOver(newHandle), _) ⇒
|
||||
// Shutdown old reader
|
||||
handle foreach { _.disassociate() }
|
||||
|
|
|
|||
|
|
@ -352,24 +352,34 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
Some(context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune))
|
||||
else None
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow) {
|
||||
case InvalidAssociation(localAddress, remoteAddress, e) ⇒
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
|
||||
Stop
|
||||
|
||||
case NonFatal(e) ⇒
|
||||
// Retrying immediately if the retry gate is disabled, and it is an endpoint used for writing.
|
||||
if (!retryGateEnabled && endpoints.isWritable(sender)) {
|
||||
// This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue
|
||||
// to the restarted endpoint -- thus no messages are lost
|
||||
Restart
|
||||
} else {
|
||||
// This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure,
|
||||
// keeps throwing away messages until the retry gate becomes open (time specified in RetryGateClosedFor)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor)
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) {
|
||||
case InvalidAssociation(localAddress, remoteAddress, _) ⇒
|
||||
log.error("Tried to associate with invalid remote address [{}]. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
|
||||
Stop
|
||||
}
|
||||
}
|
||||
|
||||
case NonFatal(e) ⇒
|
||||
|
||||
// logging
|
||||
e match {
|
||||
case _: EndpointDisassociatedException | _: EndpointAssociationException ⇒ // no logging
|
||||
case _ ⇒ log.error(e, e.getMessage)
|
||||
}
|
||||
|
||||
// Retrying immediately if the retry gate is disabled, and it is an endpoint used for writing.
|
||||
if (!retryGateEnabled && endpoints.isWritable(sender)) {
|
||||
// This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue
|
||||
// to the restarted endpoint -- thus no messages are lost
|
||||
Restart
|
||||
} else {
|
||||
// This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure,
|
||||
// keeps throwing away messages until the retry gate becomes open (time specified in RetryGateClosedFor)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor)
|
||||
Stop
|
||||
}
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Listen(addressesPromise) ⇒
|
||||
|
|
|
|||
|
|
@ -137,9 +137,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
}
|
||||
|
||||
"send error message for wrong address" in {
|
||||
filterEvents(EventFilter[EndpointException](occurrences = 6), EventFilter.error(start = "Association", occurrences = 6)) {
|
||||
system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
||||
}
|
||||
filterEvents(EventFilter.error(start = "Association", occurrences = 6),
|
||||
EventFilter.warning(pattern = ".*dead letter.*echo.*", occurrences = 1)) {
|
||||
system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
||||
}
|
||||
}
|
||||
|
||||
"support ask" in {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue