diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index d8b279af93..8984b39812 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -239,6 +239,12 @@ akka { # order of seconds or minutes. gate-unknown-addresses-for = 60 s + # This settings controls how long a system will be quarantined after + # catastrophic communication failures that result in the loss of system + # messages. Quarantining prevents communication with the remote system + # of a given UID. + quarantine-systems-for = 60s + # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is # declared to be dead and its UID marked as tainted. diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 3a6b6eb17e..6dfae9130a 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -127,6 +127,13 @@ private[remote] class EndpointDisassociatedException(msg: String) extends Endpoi @SerialVersionUID(1L) private[remote] class EndpointAssociationException(msg: String, cause: Throwable) extends EndpointException(msg, cause) +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[remote] class QuarantinedUidException(uid: Int, remoteAddress: Address) + extends EndpointException(s"Refused association to [$remoteAddress] because its UID [$uid] is quarantined.") + /** * INTERNAL API */ @@ -147,9 +154,10 @@ private[remote] object ReliableDeliverySupervisor { transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, + refuseUid: Option[Int], receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]): Props = Props(classOf[ReliableDeliverySupervisor], handleOrActive, localAddress, remoteAddress, transport, settings, - codec, receiveBuffers) + codec, refuseUid, receiveBuffers) } /** @@ -162,13 +170,14 @@ private[remote] class ReliableDeliverySupervisor( val transport: Transport, val settings: RemoteSettings, val codec: AkkaPduCodec, + val refuseUid: Option[Int], val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor { import ReliableDeliverySupervisor._ def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) { - case e @ (_: InvalidAssociation | _: HopelessAssociation) ⇒ Escalate + case e @ (_: InvalidAssociation | _: HopelessAssociation | _: QuarantinedUidException) ⇒ Escalate case NonFatal(e) ⇒ if (retryGateEnabled) { import context.dispatcher @@ -299,6 +308,7 @@ private[remote] class ReliableDeliverySupervisor( transport = transport, settings = settings, AkkaPduProtobufCodec, + refuseUid, receiveBuffers = receiveBuffers, reliableDeliverySupervisor = Some(self)) .withDispatcher("akka.remote.writer-dispatcher"), @@ -339,10 +349,11 @@ private[remote] object EndpointWriter { transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, + refuseUid: Option[Int], receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]], reliableDeliverySupervisor: Option[ActorRef]): Props = Props(classOf[EndpointWriter], handleOrActive, localAddress, remoteAddress, transport, settings, codec, - receiveBuffers, reliableDeliverySupervisor) + refuseUid, receiveBuffers, reliableDeliverySupervisor) /** * This message signals that the current association maintained by the local EndpointWriter and EndpointReader is @@ -377,6 +388,7 @@ private[remote] class EndpointWriter( transport: Transport, settings: RemoteSettings, codec: AkkaPduCodec, + val refuseUid: Option[Int], val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]], val reliableDeliverySupervisor: Option[ActorRef]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with Stash with FSM[EndpointWriter.State, Unit] { @@ -441,6 +453,11 @@ private[remote] class EndpointWriter( case Event(Status.Failure(e), _) ⇒ publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e)) case Event(inboundHandle: AkkaProtocolHandle, _) ⇒ + refuseUid match { + case Some(uid) if inboundHandle.handshakeInfo.uid == uid ⇒ + publishAndThrow(new QuarantinedUidException(inboundHandle.handshakeInfo.uid, inboundHandle.remoteAddress)) + case _ ⇒ // Everything is fine + } // Assert handle == None? context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid) handle = Some(inboundHandle) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5d80522c90..862cb04a0f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -404,9 +404,12 @@ private[akka] class RemoteActorRefProvider( private def hasAddress(address: Address): Boolean = address == local.rootPath.address || address == rootPath.address || transport.addresses(address) - def quarantine(address: Address, uid: Int): Unit = { - // FIXME send to EndpointManager - } + /** + * Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses. + * @param address Address of the remote system to be quarantined + * @param uid UID of the remote system + */ + def quarantine(address: Address, uid: Int): Unit = transport.quarantine(address: Address, uid: Int) /** * INTERNAL API diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 73614e03f1..fd57f31735 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -23,33 +23,60 @@ class RemoteSettings(val config: Config) { val LogRemoteLifecycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events") - val ShutdownTimeout: Timeout = - Duration(getMilliseconds("akka.remote.shutdown-timeout"), MILLISECONDS) + val ShutdownTimeout: Timeout = { + Timeout(Duration(getMilliseconds("akka.remote.shutdown-timeout"), MILLISECONDS)) + } requiring (_.duration > Duration.Zero, "shutdown-timeout must be > 0") - val FlushWait: FiniteDuration = Duration(getMilliseconds("akka.remote.flush-wait-on-shutdown"), MILLISECONDS) + val FlushWait: FiniteDuration = { + Duration(getMilliseconds("akka.remote.flush-wait-on-shutdown"), MILLISECONDS) + } requiring (_ > Duration.Zero, "flush-wait-on-shutdown must be > 0") - val StartupTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.remote.startup-timeout"), MILLISECONDS)) + val StartupTimeout: Timeout = { + Timeout(Duration(getMilliseconds("akka.remote.startup-timeout"), MILLISECONDS)) + } requiring (_.duration > Duration.Zero, "startup-timeout must be > 0") - val RetryGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remote.retry-gate-closed-for"), MILLISECONDS) + val RetryGateClosedFor: FiniteDuration = { + Duration(getMilliseconds("akka.remote.retry-gate-closed-for"), MILLISECONDS) + } requiring (_ >= Duration.Zero, "retry-gate-closed-for must be >= 0") - val UnknownAddressGateClosedFor: FiniteDuration = Duration(getMilliseconds("akka.remote.gate-unknown-addresses-for"), MILLISECONDS) + val UnknownAddressGateClosedFor: FiniteDuration = { + Duration(getMilliseconds("akka.remote.gate-unknown-addresses-for"), MILLISECONDS) + } requiring (_ > Duration.Zero, "gate-unknown-addresses-for must be > 0") val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections") - val MaximumRetriesInWindow: Int = getInt("akka.remote.maximum-retries-in-window") + val MaximumRetriesInWindow: Int = { + getInt("akka.remote.maximum-retries-in-window") + } requiring (_ > 0, "maximum-retries-in-window must be > 0") - val RetryWindow: FiniteDuration = Duration(getMilliseconds("akka.remote.retry-window"), MILLISECONDS) + val RetryWindow: FiniteDuration = { + Duration(getMilliseconds("akka.remote.retry-window"), MILLISECONDS) + } requiring (_ > Duration.Zero, "retry-window must be > 0") - val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remote.backoff-interval"), MILLISECONDS) + val BackoffPeriod: FiniteDuration = { + Duration(getMilliseconds("akka.remote.backoff-interval"), MILLISECONDS) + } requiring (_ > Duration.Zero, "backoff-interval must be > 0") - val SysMsgAckTimeout: FiniteDuration = Duration(getMilliseconds("akka.remote.system-message-ack-piggyback-timeout"), MILLISECONDS) + val SysMsgAckTimeout: FiniteDuration = { + Duration(getMilliseconds("akka.remote.system-message-ack-piggyback-timeout"), MILLISECONDS) + } requiring (_ > Duration.Zero, "system-message-ack-piggyback-timeout must be > 0") - val SysResendTimeout: FiniteDuration = Duration(getMilliseconds("akka.remote.resend-interval"), MILLISECONDS) + val SysResendTimeout: FiniteDuration = { + Duration(getMilliseconds("akka.remote.resend-interval"), MILLISECONDS) + } requiring (_ > Duration.Zero, "resend-interval must be > 0") - val SysMsgBufferSize: Int = getInt("akka.remote.system-message-buffer-size") + val SysMsgBufferSize: Int = { + getInt("akka.remote.system-message-buffer-size") + } requiring (_ > 0, "system-message-buffer-size must be > 0") - val CommandAckTimeout: Timeout = + val QuarantineDuration: FiniteDuration = { + if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Zero + else Duration(getMilliseconds("akka.remote.quarantine-systems-for"), MILLISECONDS) + } requiring (_ >= Duration.Zero, "resend-interval must be > 0 or off") + + val CommandAckTimeout: Timeout = { Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS)) + } requiring (_.duration > Duration.Zero, "command-ack-timeout must be > 0") val WatchFailureDetectorConfig: Config = getConfig("akka.remote.watch-failure-detector") val WatchFailureDetectorImplementationClass: String = WatchFailureDetectorConfig.getString("implementation-class") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 7d891b2299..b89c36996b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -82,6 +82,13 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va */ def log: LoggingAdapter + /** + * Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses. + * @param address Address of the remote system to be quarantined + * @param uid UID of the remote system + */ + def quarantine(address: Address, uid: Int): Unit + /** * When this method returns true, some functionality will be turned off for security purposes. */ diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 6069ca67b4..d4f919e485 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -207,6 +207,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null) } + override def quarantine(remoteAddress: Address, uid: Int): Unit = endpointManager match { + case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid) + case _ ⇒ throw new RemoteTransportExceptionNoStackTrace( + s"Attempted to quarantine addres [$remoteAddress] with uid [$uid] but Remoting is not running", null) + } + // Not used anywhere only to keep compatibility with RemoteTransport interface protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode @@ -233,6 +239,7 @@ private[remote] object EndpointManager { // acknowledged delivery buffers def seq = seqOpt.get } + case class Quarantine(remoteAddress: Address, uid: Int) extends RemotingCommand case class ManagementCommand(cmd: Any) extends RemotingCommand case class ManagementCommandAck(status: Boolean) @@ -257,7 +264,7 @@ private[remote] object EndpointManager { case class Gated(timeOfRelease: Deadline) extends EndpointPolicy { override def isTombstone: Boolean = true } - case class Quarantined(reason: Throwable) extends EndpointPolicy { + case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy { override def isTombstone: Boolean = true } @@ -309,9 +316,9 @@ private[remote] object EndpointManager { def isReadOnly(endpoint: ActorRef): Boolean = readonlyToAddress contains endpoint - def isQuarantined(address: Address): Boolean = writableEndpointWithPolicyFor(address) match { - case Some(Quarantined(_)) ⇒ true - case _ ⇒ false + def isQuarantined(address: Address, uid: Int): Boolean = writableEndpointWithPolicyFor(address) match { + case Some(Quarantined(`uid`, timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft() + case _ ⇒ false } def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit = @@ -323,14 +330,16 @@ private[remote] object EndpointManager { readonlyToAddress -= endpoint } - def markAsQuarantined(address: Address, reason: Throwable): Unit = addressToWritable += address -> Quarantined(reason) + def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = + addressToWritable += address -> Quarantined(uid, timeOfRelease) def allEndpoints: collection.Iterable[ActorRef] = writableToAddress.keys ++ readonlyToAddress.keys - def pruneGatedEntries(): Unit = { + def prune(): Unit = { addressToWritable = addressToWritable.filter { - case (_, Gated(timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft - case _ ⇒ true + case (_, Gated(timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft + case (_, Quarantined(_, timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft + case _ ⇒ true } } } @@ -371,14 +380,28 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop - case e @ HopelessAssociation(localAddress, remoteAddress, uid, _) ⇒ - log.error("Association with [{}] having uid [{}] is irrecoverably failed. UID is now quarantined and all " + - "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover" + - "from this situation.", remoteAddress, uid) - endpoints.markAsQuarantined(remoteAddress, e) // TODO: quarantine uid + case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒ + if (settings.QuarantineDuration > Duration.Zero) { + log.error("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " + + "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " + + "from this situation.", remoteAddress, uid) + endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) + } context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop + case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ + if (settings.QuarantineDuration > Duration.Zero) { + log.error("Association to [{}] with unknown UID is irrecoverably failed. " + + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) + endpoints.markAsFailed(sender, Deadline.now + settings.QuarantineDuration) + } + context.system.eventStream.publish(AddressTerminated(remoteAddress)) + Stop + + case _: QuarantinedUidException ⇒ + Stop + case NonFatal(e) ⇒ // logging e match { @@ -428,31 +451,33 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case s @ Send(message, senderOption, recipientRef, _) ⇒ val recipientAddress = recipientRef.path.address - def createAndRegisterWritingEndpoint(): ActorRef = endpoints.registerWritableEndpoint(recipientAddress, createEndpoint( + def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef = endpoints.registerWritableEndpoint(recipientAddress, createEndpoint( recipientAddress, recipientRef.localAddressToUse, transportMapping(recipientRef.localAddressToUse), settings, - None, + handleOption = None, + refuseUid, writing = true)) endpoints.writableEndpointWithPolicyFor(recipientAddress) match { case Some(Pass(endpoint)) ⇒ endpoint ! s case Some(Gated(timeOfRelease)) ⇒ - if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s + if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(None) ! s else extendedSystem.deadLetters ! s - case Some(Quarantined(_)) ⇒ - extendedSystem.deadLetters ! s + case Some(Quarantined(uid, timeOfRelease)) ⇒ + if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(Some(uid)) ! s + else createAndRegisterWritingEndpoint(None) ! s case None ⇒ - createAndRegisterWritingEndpoint() ! s + createAndRegisterWritingEndpoint(None) ! s } case InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match { case Some(endpoint) ⇒ endpoint ! EndpointWriter.TakeOver(handle) case None ⇒ - if (endpoints.isQuarantined(handle.remoteAddress)) handle.disassociate() + if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate() else { val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress) eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, true)) @@ -462,6 +487,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transportMapping(handle.localAddress), settings, Some(handle), + refuseUid = None, writing) if (writing) endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint) @@ -472,7 +498,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Terminated(endpoint) ⇒ endpoints.unregisterEndpoint(endpoint) case Prune ⇒ - endpoints.pruneGatedEntries() + endpoints.prune() case ShutdownAndFlush ⇒ // Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully) val sys = context.system // Avoid closing over context @@ -543,6 +569,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport: Transport, endpointSettings: RemoteSettings, handleOption: Option[AkkaProtocolHandle], + refuseUid: Option[Int], writing: Boolean): ActorRef = { assert(transportMapping contains localAddress) @@ -553,6 +580,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport, endpointSettings, AkkaPduProtobufCodec, + refuseUid, receiveBuffers).withDispatcher("akka.remote.writer-dispatcher"), "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) else context.watch(context.actorOf(EndpointWriter( @@ -562,6 +590,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport, endpointSettings, AkkaPduProtobufCodec, + refuseUid, receiveBuffers, reliableDeliverySupervisor = None).withDispatcher("akka.remote.writer-dispatcher"), "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala index 15f642c528..cc2119689b 100644 --- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala @@ -27,7 +27,7 @@ class EndpointRegistrySpec extends AkkaSpec { reg.isWritable(actorA) must be(true) reg.isReadOnly(actorA) must be(false) - reg.isQuarantined(address1) must be(false) + reg.isQuarantined(address1, 42) must be(false) } "be able to register a read-only endpoint" in { @@ -40,7 +40,7 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) must be === None reg.isWritable(actorA) must be(false) reg.isReadOnly(actorA) must be(true) - reg.isQuarantined(address1) must be(false) + reg.isQuarantined(address1, 42) must be(false) } "be able to register a writable and a read-only endpoint correctly" in { @@ -89,13 +89,13 @@ class EndpointRegistrySpec extends AkkaSpec { reg.registerWritableEndpoint(address2, actorB) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) - reg.markAsQuarantined(address2, null) + reg.markAsQuarantined(address2, 42, deadline) reg.unregisterEndpoint(actorA) reg.unregisterEndpoint(actorB) reg.writableEndpointWithPolicyFor(address1) must be === Some(Gated(deadline)) - reg.writableEndpointWithPolicyFor(address2) must be === Some(Quarantined(null)) + reg.writableEndpointWithPolicyFor(address2) must be === Some(Quarantined(42, deadline)) } @@ -107,7 +107,7 @@ class EndpointRegistrySpec extends AkkaSpec { reg.markAsFailed(actorA, Deadline.now) val farInTheFuture = Deadline.now + Duration(60, SECONDS) reg.markAsFailed(actorB, farInTheFuture) - reg.pruneGatedEntries() + reg.prune() reg.writableEndpointWithPolicyFor(address1) must be === None reg.writableEndpointWithPolicyFor(address2) must be === Some(Gated(farInTheFuture)) @@ -115,11 +115,13 @@ class EndpointRegistrySpec extends AkkaSpec { "be able to register Quarantined policy for an address" in { val reg = new EndpointRegistry + val deadline = Deadline.now + 30.minutes reg.writableEndpointWithPolicyFor(address1) must be === None - reg.markAsQuarantined(address1, null) - reg.isQuarantined(address1) must be(true) - reg.writableEndpointWithPolicyFor(address1) must be === Some(Quarantined(null)) + reg.markAsQuarantined(address1, 42, deadline) + reg.isQuarantined(address1, 42) must be(true) + reg.isQuarantined(address1, 33) must be(false) + reg.writableEndpointWithPolicyFor(address1) must be === Some(Quarantined(42, deadline)) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 1281e7c80a..11d466e086 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -40,6 +40,7 @@ class RemoteConfigSpec extends AkkaSpec( SysMsgAckTimeout must be(0.3 seconds) SysResendTimeout must be(1 seconds) SysMsgBufferSize must be(1000) + QuarantineDuration must be(60 seconds) CommandAckTimeout.duration must be(30 seconds) Transports.size must be(1) Transports.head._1 must be(classOf[akka.remote.transport.netty.NettyTransport].getName)