diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 8762e1d940..673fee3706 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -139,8 +139,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { else { val cache = manifestCache.get cache.get(manifest) match { - case Some(cachedClassManifest) => s1.fromBinary(bytes, cachedClassManifest) - case None => + case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest) + case None ⇒ system.dynamicAccess.getClassFor[AnyRef](manifest) match { case Success(classManifest) ⇒ val classManifestOption: Option[Class[_]] = Some(classManifest) @@ -167,7 +167,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { "akka.actor.serializers is not in synch between the two systems.") } serializer match { - case ser: ByteBufferSerializer => + case ser: ByteBufferSerializer ⇒ ser.fromBinary(buf, manifest) case _ ⇒ val bytes = Array.ofDim[Byte](buf.remaining()) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 03d73b1fde..8c09e80f39 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -46,7 +46,8 @@ class CodecBenchmark { remote.artery.hostname = localhost remote.artery.port = 0 } - """) + """ + ) implicit val system = ActorSystem("CodecBenchmark", config) val systemB = ActorSystem("systemB", system.settings.config) @@ -56,8 +57,10 @@ class CodecBenchmark { val headerIn = HeaderBuilder(compression) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) - val uniqueLocalAddress = UniqueAddress(system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, - AddressUidExtension(system).addressUid) + val uniqueLocalAddress = UniqueAddress( + system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, + AddressUidExtension(system).addressUid + ) val payload = Array.ofDim[Byte](1000) private var materializer: ActorMaterializer = _ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala index 0a4e68143a..713c02160a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala @@ -65,11 +65,13 @@ abstract class QuickRestartSpec runOn(second) { restartingSystem = if (restartingSystem == null) - ActorSystem(system.name, + ActorSystem( + system.name, ConfigFactory.parseString(s"akka.cluster.roles = [round-$n]") .withFallback(system.settings.config)) else - ActorSystem(system.name, + ActorSystem( + system.name, ConfigFactory.parseString(s""" akka.cluster.roles = [round-$n] akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}""") // same port diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index ef76581368..bd0446bce2 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -195,11 +195,11 @@ object MultiNodeSpec { require(selfIndex >= 0 && selfIndex < maxNodes, "multinode.index is out of bounds: " + selfIndex) private[testkit] val nodeConfig = mapToConfig(Map( - "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", - "akka.remote.artery.hostname" -> selfName, - "akka.remote.netty.tcp.hostname" -> selfName, - "akka.remote.netty.tcp.port" -> selfPort, - "akka.remote.artery.port" -> selfPort)) + "akka.actor.provider" → "akka.remote.RemoteActorRefProvider", + "akka.remote.artery.hostname" → selfName, + "akka.remote.netty.tcp.hostname" → selfName, + "akka.remote.netty.tcp.port" → selfPort, + "akka.remote.artery.port" → selfPort)) private[testkit] val baseConfig: Config = ConfigFactory.parseString(""" akka { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 27159795b1..8dfd030aef 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -57,10 +57,10 @@ object AeronStreamLatencySpec extends MultiNodeConfig { """))) final case class TestSettings( - testName: String, + testName: String, messageRate: Int, // msg/s payloadSize: Int, - repeat: Int) + repeat: Int) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 110ce66554..e8460d8b4e 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -50,9 +50,9 @@ object AeronStreamMaxThroughputSpec extends MultiNodeConfig { """))) final case class TestSettings( - testName: String, + testName: String, totalMessages: Long, - payloadSize: Int) + payloadSize: Int) def iterate(start: Long, end: Long): Iterator[Long] = new AbstractIterator[Long] { private[this] var first = true diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala index 3c934011d3..7084c138ce 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala @@ -34,7 +34,7 @@ object HandshakeRestartReceiverSpec extends MultiNodeConfig { class Subject extends Actor { def receive = { case "shutdown" ⇒ context.system.terminate() - case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self) + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid → self) } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index 1e2a2bb39b..fa0ad212ad 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -122,10 +122,10 @@ object LatencySpec extends MultiNodeConfig { } final case class TestSettings( - testName: String, + testName: String, messageRate: Int, // msg/s payloadSize: Int, - repeat: Int) + repeat: Int) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 5e395187ed..77e0039614 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -147,10 +147,10 @@ object MaxThroughputSpec extends MultiNodeConfig { } final case class TestSettings( - testName: String, - totalMessages: Long, - burstSize: Int, - payloadSize: Int, + testName: String, + totalMessages: Long, + burstSize: Int, + payloadSize: Int, senderReceiverPairs: Int) } @@ -240,7 +240,8 @@ abstract class MaxThroughputSpec val senders = for (n ← 1 to senderReceiverPairs) yield { val receiver = identifyReceiver(receiverName + n) val plotProbe = TestProbe() - val snd = system.actorOf(senderProps(receiver, testSettings, plotProbe.ref), + val snd = system.actorOf( + senderProps(receiver, testSettings, plotProbe.ref), testName + "-snd" + n) val terminationProbe = TestProbe() terminationProbe.watch(snd) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala index 08858e62f1..01033b679c 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala @@ -6,7 +6,7 @@ package akka.remote.artery final case class PlotResult(values: Vector[(String, Number)] = Vector.empty) { def add(key: String, value: Number): PlotResult = - copy(values = values :+ (key -> value)) + copy(values = values :+ (key → value)) def addAll(p: PlotResult): PlotResult = copy(values ++ p.values) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala index f2402fae57..980aa51908 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -48,7 +48,7 @@ object RemoteRestartedQuarantinedSpec extends MultiNodeConfig { class Subject extends Actor { def receive = { case "shutdown" ⇒ context.system.terminate() - case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self) + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid → self) } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala index 2cef0d0ec6..6446aa7404 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala @@ -6,7 +6,8 @@ package akka.remote.artery import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.Executors -class TestRateReporter(name: String) extends RateReporter(SECONDS.toNanos(1), +class TestRateReporter(name: String) extends RateReporter( + SECONDS.toNanos(1), new RateReporter.Reporter { override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { println(name + diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 62bf1e9ecf..a5d5338395 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -71,11 +71,11 @@ import akka.stream.ActorMaterializerSettings * INTERNAL API */ private[akka] final case class InboundEnvelope( - recipient: InternalActorRef, + recipient: InternalActorRef, recipientAddress: Address, - message: AnyRef, - senderOption: Option[ActorRef], - originUid: Long) + message: AnyRef, + senderOption: Option[ActorRef], + originUid: Long) /** * INTERNAL API @@ -123,9 +123,9 @@ private[akka] object AssociationState { * INTERNAL API */ private[akka] final class AssociationState( - val incarnation: Int, + val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: Set[Long]) { + val quarantined: Set[Long]) { /** * Full outbound address with UID for this association. @@ -239,7 +239,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val systemMessageResendInterval: FiniteDuration = 1.second private val handshakeRetryInterval: FiniteDuration = 1.second private val handshakeTimeout: FiniteDuration = - system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero, + system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring( + _ > Duration.Zero, "handshake-timeout must be > 0") private val injectHandshakeInterval: FiniteDuration = 1.second private val giveUpSendAfter: FiniteDuration = 60.seconds diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 8e9db7092d..2064c67cdd 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -44,11 +44,11 @@ import akka.util.{ Unsafe, WildcardTree } * remote address. */ private[akka] class Association( - val transport: ArteryTransport, - val materializer: Materializer, - override val remoteAddress: Address, + val transport: ArteryTransport, + val materializer: Materializer, + override val remoteAddress: Address, override val controlSubject: ControlMessageSubject, - largeMessageDestinations: WildcardTree[NotUsed]) + largeMessageDestinations: WildcardTree[NotUsed]) extends AbstractAssociation with OutboundContext { private val log = Logging(transport.system, getClass.getName) @@ -103,7 +103,8 @@ private[akka] class Association( Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState] def completeHandshake(peer: UniqueAddress): Unit = { - require(remoteAddress == peer.address, + require( + remoteAddress == peer.address, s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}") val current = associationState current.uniqueRemoteAddressPromise.trySuccess(peer) @@ -114,7 +115,8 @@ private[akka] class Association( if (swapState(current, newState)) { current.uniqueRemoteAddressValue() match { case Some(Success(old)) ⇒ - log.debug("Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", + log.debug( + "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", newState.incarnation, peer.address, peer.uid, old.uid) case _ ⇒ // Failed, nothing to do @@ -190,7 +192,8 @@ private[akka] class Association( val newState = current.newQuarantined() if (swapState(current, newState)) { // quarantine state change was performed - log.warning("Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", + log.warning( + "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", remoteAddress, u, reason) // end delivery of system messages to that incarnation after this point send(ClearSystemMessageDelivery, None, dummyRecipient) @@ -200,10 +203,12 @@ private[akka] class Association( quarantine(reason, uid) // recursive } case Some(Success(peer)) ⇒ - log.debug("Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}", + log.debug( + "Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}", remoteAddress, u, peer.uid, reason) case None ⇒ - log.debug("Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}", + log.debug( + "Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}", remoteAddress, reason) } case None ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 1457373146..eb6470e73b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -15,9 +15,9 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } // TODO: Long UID class Encoder( uniqueLocalAddress: UniqueAddress, - system: ActorSystem, - compressionTable: LiteralCompressionTable, - pool: EnvelopeBufferPool) + system: ActorSystem, + compressionTable: LiteralCompressionTable, + pool: EnvelopeBufferPool) extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { val in: Inlet[Send] = Inlet("Artery.Encoder.in") @@ -108,11 +108,11 @@ class Encoder( } class Decoder( - uniqueLocalAddress: UniqueAddress, - system: ExtendedActorSystem, + uniqueLocalAddress: UniqueAddress, + system: ExtendedActorSystem, resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, - compressionTable: LiteralCompressionTable, - pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { + compressionTable: LiteralCompressionTable, + pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) @@ -172,7 +172,8 @@ class Decoder( push(out, decoded) } catch { case NonFatal(e) ⇒ - log.warning("Failed to deserialize message with serializer id [{}] and manifest [{}]. {}", + log.warning( + "Failed to deserialize message with serializer id [{}] and manifest [{}]. {}", headerBuilder.serializer, headerBuilder.manifest, e.getMessage) pull(in) } finally { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 8be3029670..d1f96ce91b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -198,14 +198,16 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt push(out, env) else { // FIXME remove, only debug - log.warning(s"Dropping message [{}] from unknown system with UID [{}]. " + - "This system with UID [{}] was probably restarted. " + - "Messages will be accepted when new handshake has been completed.", - env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid) - if (log.isDebugEnabled) - log.debug(s"Dropping message [{}] from unknown system with UID [{}]. " + + log.warning( + s"Dropping message [{}] from unknown system with UID [{}]. " + "This system with UID [{}] was probably restarted. " + "Messages will be accepted when new handshake has been completed.", + env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid) + if (log.isDebugEnabled) + log.debug( + s"Dropping message [{}] from unknown system with UID [{}]. " + + "This system with UID [{}] was probably restarted. " + + "Messages will be accepted when new handshake has been completed.", env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid) pull(in) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala index 2cdb2e3542..531e9c4aff 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala @@ -33,7 +33,8 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten push(out, env) case association ⇒ if (association.associationState.isQuarantined(env.originUid)) { - inboundContext.sendControl(association.remoteAddress, + inboundContext.sendControl( + association.remoteAddress, Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid))) pull(in) } else diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala index a76a4b478b..8e871c3f33 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -21,16 +21,17 @@ import akka.remote.RemoteRef * INTERNAL API */ private[akka] class MessageDispatcher( - system: ExtendedActorSystem, + system: ExtendedActorSystem, provider: RemoteActorRefProvider) { private val remoteDaemon = provider.remoteDaemon private val log = Logging(system, getClass.getName) - def dispatch(recipient: InternalActorRef, - recipientAddress: Address, - message: AnyRef, - senderOption: Option[ActorRef]): Unit = { + def dispatch( + recipient: InternalActorRef, + recipientAddress: Address, + message: AnyRef, + senderOption: Option[ActorRef]): Unit = { import provider.remoteSettings._ @@ -54,8 +55,9 @@ private[akka] class MessageDispatcher( case sel: ActorSelectionMessage ⇒ if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) || sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) - log.debug("operating in UntrustedMode, dropping inbound actor selection to [{}], " + - "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration", + log.debug( + "operating in UntrustedMode, dropping inbound actor selection to [{}], " + + "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration", sel.elements.mkString("/", "/", "")) else // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor @@ -72,10 +74,12 @@ private[akka] class MessageDispatcher( // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) r.!(message)(sender) else - log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]", + log.error( + "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]", message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) - case r ⇒ log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", + case r ⇒ log.error( + "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 472ce89fe7..4f1766821e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -44,8 +44,8 @@ private[akka] object SystemMessageDelivery { */ private[akka] class SystemMessageDelivery( outboundContext: OutboundContext, - resendInterval: FiniteDuration, - maxBufferSize: Int) + resendInterval: FiniteDuration, + maxBufferSize: Int) extends GraphStage[FlowShape[Send, Send]] { import SystemMessageDelivery._ diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index 67b5ce8a3a..0184920d37 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -9,19 +9,19 @@ class EnvelopeBufferSpec extends AkkaSpec { object TestCompressor extends LiteralCompressionTable { val refToIdx = Map( - "compressable0" -> 0, - "compressable1" -> 1, - "reallylongcompressablestring" -> 2) + "compressable0" → 0, + "compressable1" → 1, + "reallylongcompressablestring" → 2) val idxToRef = refToIdx.map(_.swap) val serializerToIdx = Map( - "serializer0" -> 0, - "serializer1" -> 1) + "serializer0" → 0, + "serializer1" → 1) val idxToSer = serializerToIdx.map(_.swap) val manifestToIdx = Map( - "manifest0" -> 0, - "manifest1" -> 1) + "manifest0" → 0, + "manifest1" → 1) val idxToManifest = manifestToIdx.map(_.swap) override def compressActorRef(ref: String): Int = refToIdx.getOrElse(ref, -1) diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index df5f2d8189..27bbd3e13b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -33,7 +33,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { private def setupStream( outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds, - retryInterval: FiniteDuration = 10.seconds, + retryInterval: FiniteDuration = 10.seconds, injectHandshakeInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = { val destination = null.asInstanceOf[RemoteActorRef] // not used diff --git a/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala index 6b104257fe..5c3364da39 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SerializationErrorSpec.scala @@ -75,8 +75,8 @@ class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) wit EventFilter.warning( start = "Failed to deserialize message with serializer id [4]", occurrences = 1).intercept { - remoteRef ! "boom".getBytes("utf-8") - }(systemB) + remoteRef ! "boom".getBytes("utf-8") + }(systemB) remoteRef ! "ping2" expectMsg("ping2") diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 561150f8fc..bbc9cefaa2 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -21,9 +21,9 @@ import akka.remote.artery.InboundControlJunction.ControlMessageSubject private[akka] class TestInboundContext( override val localAddress: UniqueAddress, - val controlSubject: TestControlMessageSubject = new TestControlMessageSubject, - val controlProbe: Option[ActorRef] = None, - val replyDropRate: Double = 0.0) extends InboundContext { + val controlSubject: TestControlMessageSubject = new TestControlMessageSubject, + val controlProbe: Option[ActorRef] = None, + val replyDropRate: Double = 0.0) extends InboundContext { private val associationsByAddress = new ConcurrentHashMap[Address, OutboundContext]() private val associationsByUid = new ConcurrentHashMap[Long, OutboundContext]() @@ -58,10 +58,10 @@ private[akka] class TestInboundContext( } private[akka] class TestOutboundContext( - override val localAddress: UniqueAddress, - override val remoteAddress: Address, + override val localAddress: UniqueAddress, + override val remoteAddress: Address, override val controlSubject: TestControlMessageSubject, - val controlProbe: Option[ActorRef] = None) extends OutboundContext { + val controlProbe: Option[ActorRef] = None) extends OutboundContext { // access to this is synchronized (it's a test utility) private var _associationState = AssociationState() @@ -117,8 +117,8 @@ private[akka] class TestControlMessageSubject extends ControlMessageSubject { } private[akka] class ManualReplyInboundContext( - replyProbe: ActorRef, - localAddress: UniqueAddress, + replyProbe: ActorRef, + localAddress: UniqueAddress, controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) { private var lastReply: Option[(Address, ControlMessage)] = None diff --git a/project/MiMa.scala b/project/MiMa.scala index 0d2cd9941b..0a96b84a24 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -747,9 +747,6 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.DefaultSSLContextCreation.validateAndWarnAboutLooseSettings") ), "2.4.4" -> Seq( - // Remove useUntrustedMode which is an internal API and not used anywhere anymore - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"), // #20080, #20081 remove race condition on HTTP client ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.Http#HostConnectionPool.gatewayFuture"), @@ -851,6 +848,11 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.client.ClusterClient.initialContactsSel") ), "2.4.6" -> Seq( + + // Remove useUntrustedMode which is an internal API and not used anywhere anymore + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"), + // internal api FilterAnyProblemStartingWith("akka.stream.impl"),