Fixed review comments
- Using Duration.Undefined - Quarantine message is actually handled ;)
This commit is contained in:
parent
33e2710bf9
commit
444b73bec6
3 changed files with 36 additions and 14 deletions
|
|
@ -242,7 +242,8 @@ akka {
|
|||
# This settings controls how long a system will be quarantined after
|
||||
# catastrophic communication failures that result in the loss of system
|
||||
# messages. Quarantining prevents communication with the remote system
|
||||
# of a given UID.
|
||||
# of a given UID. This function can be disabled by setting the value
|
||||
# to "off".
|
||||
quarantine-systems-for = 60s
|
||||
|
||||
# This setting defines the maximum number of unacknowledged system messages
|
||||
|
|
|
|||
|
|
@ -69,10 +69,11 @@ class RemoteSettings(val config: Config) {
|
|||
getInt("akka.remote.system-message-buffer-size")
|
||||
} requiring (_ > 0, "system-message-buffer-size must be > 0")
|
||||
|
||||
val QuarantineDuration: FiniteDuration = {
|
||||
if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Zero
|
||||
else Duration(getMilliseconds("akka.remote.quarantine-systems-for"), MILLISECONDS)
|
||||
} requiring (_ >= Duration.Zero, "resend-interval must be > 0 or off")
|
||||
val QuarantineDuration: Duration = {
|
||||
if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Undefined
|
||||
else Duration(getMilliseconds("akka.remote.quarantine-systems-for"), MILLISECONDS) requiring
|
||||
(_ >= Duration.Zero, "quarantine-systems-for must be > 0 or off")
|
||||
}
|
||||
|
||||
val CommandAckTimeout: Timeout = {
|
||||
Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS))
|
||||
|
|
|
|||
|
|
@ -381,20 +381,24 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒
|
||||
if (settings.QuarantineDuration > Duration.Zero) {
|
||||
log.error("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " +
|
||||
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " +
|
||||
"from this situation.", remoteAddress, uid)
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
log.error("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " +
|
||||
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " +
|
||||
"from this situation.", remoteAddress, uid)
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
|
||||
case _ ⇒ // disabled
|
||||
}
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
|
||||
if (settings.QuarantineDuration > Duration.Zero) {
|
||||
log.error("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.QuarantineDuration)
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
log.error("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
endpoints.markAsFailed(sender, Deadline.now + d)
|
||||
case _ ⇒
|
||||
}
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
|
@ -434,6 +438,22 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
addressesPromise.success(transportsAndAddresses)
|
||||
case ManagementCommand(_) ⇒
|
||||
sender ! ManagementCommandAck(false)
|
||||
case Quarantine(address, uid) ⇒
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
// Stop writers
|
||||
endpoints.writableEndpointWithPolicyFor(address) match {
|
||||
case Some(Pass(endpoint)) ⇒ context.stop(endpoint)
|
||||
case _ ⇒ // nothing to stop
|
||||
}
|
||||
// Stop inbound read-only associations
|
||||
endpoints.readOnlyEndpointFor(address) match {
|
||||
case Some(endpoint) ⇒ context.stop(endpoint)
|
||||
case _ ⇒ // nothing to stop
|
||||
}
|
||||
endpoints.markAsQuarantined(address, uid, Deadline.now + d)
|
||||
case _ ⇒ // Ignore
|
||||
}
|
||||
case StartupFinished ⇒
|
||||
context.become(accepting)
|
||||
case ShutdownAndFlush ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue