From e390f1397ba922d61a14abc5543ce74d4d459298 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Tue, 16 Apr 2019 20:26:09 +0200 Subject: [PATCH] Fix warnings in akka-remote (#26735) --- .../remote/PhiAccrualFailureDetector.scala | 7 +- .../akka/remote/RemoteActorRefProvider.scala | 2 + .../scala/akka/remote/RemoteWatcher.scala | 10 ++- .../src/main/scala/akka/remote/Remoting.scala | 14 ++-- .../akka/remote/RemotingLifecycleEvent.scala | 6 +- .../scala/akka/remote/UniqueAddress.scala | 2 +- .../akka/remote/artery/Association.scala | 11 ++-- .../akka/remote/artery/RemoteInstrument.scala | 1 + .../scala/akka/remote/artery/SendQueue.scala | 2 +- .../aeron/ArteryAeronUdpTransport.scala | 1 + .../artery/compress/DecompressionTable.scala | 2 +- .../artery/tcp/ArteryTcpTransport.scala | 1 + .../remote/artery/tcp/SSLEngineProvider.scala | 4 +- .../remote/routing/RemoteRouterConfig.scala | 3 +- .../DaemonMsgCreateSerializer.scala | 1 + .../MessageContainerSerializer.scala | 1 + .../transport/AkkaProtocolTransport.scala | 21 +++--- .../FailureInjectorTransportAdapter.scala | 14 ++-- .../transport/ThrottlerTransportAdapter.scala | 3 + .../akka/remote/transport/Transport.scala | 3 +- .../remote/transport/netty/NettyHelpers.scala | 2 +- .../transport/netty/NettySSLSupport.scala | 1 + .../transport/netty/NettyTransport.scala | 15 ++++- .../remote/AccrualFailureDetectorSpec.scala | 3 + .../scala/akka/remote/AckedDeliverySpec.scala | 6 +- .../scala/akka/remote/ActorsLeakSpec.scala | 25 ++++--- .../test/scala/akka/remote/DaemonicSpec.scala | 5 +- .../remote/DeadlineFailureDetectorSpec.scala | 2 +- .../akka/remote/RemoteDeathWatchSpec.scala | 14 ++-- .../RemoteDeploymentWhitelistSpec.scala | 2 +- .../akka/remote/RemoteInitErrorSpec.scala | 2 +- .../scala/akka/remote/RemoteWatcherSpec.scala | 51 +++++++------- .../test/scala/akka/remote/RemotingSpec.scala | 58 ++++++++-------- .../remote/Ticket1978CommunicationSpec.scala | 15 +++-- .../TransientSerializationErrorSpec.scala | 3 +- .../artery/DuplicateHandshakeSpec.scala | 6 +- .../remote/artery/FlightRecorderSpec.scala | 8 +-- .../remote/artery/HandshakeRetrySpec.scala | 1 - .../artery/InboundControlJunctionSpec.scala | 1 - .../remote/artery/InboundHandshakeSpec.scala | 6 +- .../akka/remote/artery/LateConnectSpec.scala | 1 - .../remote/artery/LruBoundedCacheSpec.scala | 3 + .../remote/artery/MetadataCarryingSpec.scala | 4 +- .../remote/artery/OutboundHandshakeSpec.scala | 14 ++-- .../artery/OutboundIdleShutdownSpec.scala | 66 +++++++++---------- .../remote/artery/RemoteActorForSpec.scala | 4 +- .../remote/artery/RemoteDeathWatchSpec.scala | 6 +- .../remote/artery/RemoteFailureSpec.scala | 2 +- .../RemoteInstrumentsSerializationSpec.scala | 19 +++--- .../remote/artery/RemoteInstrumentsSpec.scala | 2 +- .../RemoteMessageSerializationSpec.scala | 10 +-- .../artery/RemoteSendConsistencySpec.scala | 6 +- .../remote/artery/RemoteWatcherSpec.scala | 49 +++++++------- .../akka/remote/artery/SendQueueSpec.scala | 4 +- .../artery/SystemMessageAckerSpec.scala | 6 +- .../akka/remote/artery/UntrustedSpec.scala | 12 ++-- .../remote/artery/aeron/AeronSinkSpec.scala | 2 +- .../compress/CompressionIntegrationSpec.scala | 8 +-- ...dshakeShouldDropCompressionTableSpec.scala | 7 +- .../akka/remote/artery/tcp/TlsTcpSpec.scala | 10 +-- .../AllowJavaSerializationOffSpec.scala | 2 +- .../DaemonMsgCreateSerializerSpec.scala | 28 ++------ .../MiscMessageSerializerSpec.scala | 4 +- .../SystemMessageSerializationSpec.scala | 2 +- .../remote/transport/AkkaProtocolSpec.scala | 12 ++-- .../transport/AkkaProtocolStressTest.scala | 7 +- .../transport/GenericTransportSpec.scala | 2 +- .../SwitchableLoggedBehaviorSpec.scala | 2 +- .../SystemMessageDeliveryStressTest.scala | 1 - .../remote/transport/TestTransportSpec.scala | 4 +- .../ThrottlerTransportAdapterSpec.scala | 8 +-- .../transport/netty/NettyTransportSpec.scala | 2 +- project/AkkaDisciplinePlugin.scala | 1 - 73 files changed, 352 insertions(+), 293 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index a8f5c9b722..45a92d623a 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -110,8 +110,10 @@ class PhiAccrualFailureDetector( /** * Implement using optimistic lockless concurrency, all state is represented * by this immutable case class and managed by an AtomicReference. + * + * Cannot be final due to https://github.com/scala/bug/issues/4440 */ - private final case class State(history: HeartbeatHistory, timestamp: Option[Long]) + private case class State(history: HeartbeatHistory, timestamp: Option[Long]) private val state = new AtomicReference[State](State(history = firstHeartbeat, timestamp = None)) @@ -147,7 +149,8 @@ class PhiAccrualFailureDetector( } else oldState.history } - val newState = oldState.copy(history = newHistory, timestamp = Some(timestamp)) // record new timestamp + // record new timestamp and possibly-amended history + val newState = oldState.copy(history = newHistory, timestamp = Some(timestamp)) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat() // recur diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 9053adb1f9..09be1bf577 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -28,6 +28,7 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.remote.serialization.ActorRefResolveThreadLocalCache import akka.remote.artery.tcp.ArteryTcpTransport import akka.serialization.Serialization +import com.github.ghik.silencer.silent /** * INTERNAL API @@ -603,6 +604,7 @@ private[akka] class RemoteActorRef private[akka] ( // used by artery to direct messages to separate specialized streams @volatile private[remote] var cachedSendQueueIndex: Int = -1 + @silent def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream s.headOption match { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index dce73d24e2..7b38da2b27 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -9,10 +9,11 @@ import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.event.AddressTerminatedTopic import akka.remote.artery.ArteryMessage + import scala.collection.mutable import scala.concurrent.duration._ - import akka.remote.artery.ArteryTransport +import com.github.ghik.silencer.silent /** * INTERNAL API @@ -106,7 +107,12 @@ private[akka] class RemoteWatcher( val (heartBeatMsg, selfHeartbeatRspMsg) = if (artery) (ArteryHeartbeat, ArteryHeartbeatRsp(AddressUidExtension(context.system).longAddressUid)) - else (Heartbeat, HeartbeatRsp(AddressUidExtension(context.system).addressUid)) + else { + // For classic remoting the 'int' part is sufficient + @silent + val addressUid = AddressUidExtension(context.system).addressUid + (Heartbeat, HeartbeatRsp(addressUid)) + } // actors that this node is watching, map of watchee -> Set(watchers) val watching = new mutable.HashMap[InternalActorRef, mutable.Set[InternalActorRef]]() diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 5c806076ae..6c595567e9 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -16,6 +16,7 @@ import akka.remote.transport._ import com.typesafe.config.Config import java.net.URLEncoder import java.util.concurrent.TimeoutException + import scala.collection.immutable.{ HashMap, Seq } import scala.concurrent.duration._ import scala.concurrent.{ Await, Future, Promise } @@ -23,12 +24,15 @@ import scala.util.control.NonFatal import scala.util.{ Failure, Success } import akka.remote.transport.AkkaPduCodec.Message import java.util.concurrent.ConcurrentHashMap + import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString.UTF_8 import akka.util.OptionVal + import scala.collection.immutable import akka.actor.ActorInitializationException import akka.util.ccompat._ +import com.github.ghik.silencer.silent /** * INTERNAL API @@ -127,6 +131,7 @@ private[remote] object Remoting { /** * INTERNAL API */ +@ccompatUsedUntil213 private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { @@ -524,7 +529,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) settings.QuarantineDuration match { case d: FiniteDuration => endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) - eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) + eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid.toLong)) case _ => // disabled } Stop @@ -653,7 +658,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) uidOption match { case Some(`quarantineUid`) => endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration) - eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid)) + eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid.toLong)) context.stop(endpoint) // or it does not match with the UID to be quarantined case None if !endpoints.refuseUid(address).contains(quarantineUid) => @@ -668,7 +673,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) case (_, Some(quarantineUid)) => // the current state is gated or quarantined, and we know the UID, update endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration) - eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid)) + eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid.toLong)) case _ => // the current state is Gated, WasGated or Quarantined, and we don't know the UID, do nothing. } @@ -755,7 +760,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) case Some(Pass(endpoint, _)) => if (refuseUidOption.contains(uid)) { endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) - eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) + eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid.toLong)) context.stop(endpoint) } else endpoints.registerWritableEndpointUid(remoteAddress, uid) handleStashedInbound(sender(), writerIsIdle = false) @@ -768,6 +773,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) case ShutdownAndFlush => // Shutdown all endpoints and signal to sender() when ready (and whether all endpoints were shut down gracefully) + @silent def shutdownAll[T](resources: IterableOnce[T])(shutdown: T => Future[Boolean]): Future[Boolean] = { Future.sequence(resources.toList.map(shutdown)).map(_.forall(identity)).recover { case NonFatal(_) => false diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index d792b2b260..a50e146f2d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -7,14 +7,17 @@ package akka.remote import akka.event.Logging.LogLevel import akka.event.{ Logging, LoggingAdapter } import akka.actor.{ ActorSystem, Address } +import com.github.ghik.silencer.silent import scala.runtime.AbstractFunction2 +@silent @SerialVersionUID(1L) sealed trait RemotingLifecycleEvent extends Serializable { def logLevel: Logging.LogLevel } +@silent @SerialVersionUID(1L) sealed trait AssociationEvent extends RemotingLifecycleEvent { def localAddress: Address @@ -101,8 +104,9 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot @deprecated("Use long uid", "2.4.x") def uid: Int = longUid.toInt + @silent @deprecated("Use long uid copy method", "2.4.x") - def copy(address: Address = address, uid: Int = uid) = new QuarantinedEvent(address, uid) + def copy(address: Address = address, uid: Int = uid) = new QuarantinedEvent(address, uid.toLong) } /** diff --git a/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala b/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala index c8028ef44f..09d2c23276 100644 --- a/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala +++ b/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala @@ -17,5 +17,5 @@ final case class UniqueAddress(address: Address, uid: Long) extends Ordered[Uniq } override def toString(): String = - address + "#" + uid + address.toString + "#" + uid } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 6c666c9c2c..3850f4469d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -16,7 +16,6 @@ import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ - import akka.{ Done, NotUsed } import akka.actor.ActorRef import akka.actor.ActorSelectionMessage @@ -40,15 +39,15 @@ import akka.stream.Materializer import akka.stream.scaladsl.Keep import akka.stream.scaladsl.MergeHub import akka.stream.scaladsl.Source -import akka.util.{ Unsafe, WildcardIndex } -import akka.util.OptionVal +import akka.util.{ OptionVal, Unsafe, WildcardIndex } import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.stream.SharedKillSwitch -import scala.util.control.NoStackTrace +import scala.util.control.NoStackTrace import akka.actor.Cancellable import akka.stream.StreamTcpException import akka.util.ccompat._ +import com.github.ghik.silencer.silent /** * INTERNAL API @@ -119,6 +118,7 @@ private[remote] object Association { * Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific * remote address. */ +@ccompatUsedUntil213 private[remote] class Association( val transport: ArteryTransport, val materializer: Materializer, @@ -237,6 +237,7 @@ private[remote] class Association( * Holds reference to shared state of Association - *access only via helper methods* */ @volatile + @silent private[this] var _sharedStateDoNotCallMeDirectly: AssociationState = AssociationState() /** @@ -326,6 +327,7 @@ private[remote] class Association( outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender) // volatile read to see latest queue array + @silent val unused = queuesVisibility def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = { @@ -721,6 +723,7 @@ private[remote] class Association( } private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = { + @silent val unused = queuesVisibility // volatile read to see latest queues array queues(queueIndex) match { case existing: QueueWrapper => existing diff --git a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala index 59a39b05b6..4dc0c09768 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala @@ -26,6 +26,7 @@ import akka.util.ccompat._ * will be created for each encoder and decoder. It's only called from the operator, so if it doesn't * delegate to any shared instance it doesn't have to be thread-safe. */ +@ccompatUsedUntil213 abstract class RemoteInstrument { /** diff --git a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala index a36f5dd84e..8e55eaca93 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala @@ -100,7 +100,7 @@ private[remote] final class SendQueue[T](postStopAction: Vector[T] => Unit) } override def postStop(): Unit = { - var pending = Vector.newBuilder[T] + val pending = Vector.newBuilder[T] if (consumerQueue ne null) { var msg = consumerQueue.poll() while (msg != null) { diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala index 6ab6b8d839..f7347f63a6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala @@ -50,6 +50,7 @@ import org.agrona.concurrent.status.CountersReader.MetaData /** * INTERNAL API */ +@ccompatUsedUntil213 private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends ArteryTransport(_system, _provider) { import AeronSource.AeronLifecycle diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala index de5031e84e..2b57735ac5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala @@ -22,7 +22,7 @@ private[remote] final case class DecompressionTable[T](originUid: Long, version: } def invert: CompressionTable[T] = - CompressionTable(originUid, version, Map(table.zipWithIndex: _*)) + CompressionTable(originUid, version, table.zipWithIndex.toMap) /** Writes complete table as String (heavy operation) */ override def toString = diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 78c1f3438e..24ed0442d6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -62,6 +62,7 @@ private[remote] object ArteryTcpTransport { /** * INTERNAL API */ +@ccompatUsedUntil213 private[remote] class ArteryTcpTransport( _system: ExtendedActorSystem, _provider: RemoteActorRefProvider, diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala index bf74b15bcb..da17fd125c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/SSLEngineProvider.scala @@ -35,7 +35,9 @@ import javax.net.ssl.SSLSession import javax.net.ssl.TrustManager import javax.net.ssl.TrustManagerFactory -@ApiMayChange trait SSLEngineProvider { +@ApiMayChange +@ccompatUsedUntil213 +trait SSLEngineProvider { def createServerSSLEngine(hostname: String, port: Int): SSLEngine diff --git a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala index 77e9b28541..6224b1cd17 100644 --- a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala @@ -22,6 +22,7 @@ import akka.routing.Routee import akka.routing.Router import akka.routing.RouterActor import akka.routing.RouterConfig +import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory /** @@ -39,7 +40,7 @@ final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) exten def this(local: Pool, nodes: Array[Address]) = this(local, nodes: Iterable[Address]) // need this iterator as instance variable since Resizer may call createRoutees several times - @transient private val nodeAddressIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator + @silent @transient private val nodeAddressIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator // need this counter as instance variable since Resizer may call createRoutees several times @transient private val childNameCounter = new AtomicInteger diff --git a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala index be1f438a8a..0a16406621 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala @@ -27,6 +27,7 @@ import util.{ Failure, Success } * * INTERNAL API */ +@ccompatUsedUntil213 private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends BaseSerializer { import ProtobufSerializer.serializeActorRef import ProtobufSerializer.deserializeActorRef diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala index 7c92302f86..91713d1e09 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala @@ -16,6 +16,7 @@ import akka.remote.ContainerFormats import akka.serialization.{ BaseSerializer, SerializationExtension, Serializers } import akka.util.ccompat._ +@ccompatUsedUntil213 class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer { private lazy val serialization = SerializationExtension(system) diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index ae20e601d2..4ec49db95e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -27,6 +27,7 @@ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.event.{ LogMarker, Logging } +import com.github.ghik.silencer.silent @SerialVersionUID(1L) class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace { @@ -148,13 +149,15 @@ private[transport] class AkkaProtocolManager( val stateActorAssociationHandler = associationListener val stateActorSettings = settings val failureDetector = createTransportFailureDetector() + + // Using the 'int' addressUid rather than the 'long' is sufficient for Classic Remoting + @silent + val addressUid = AddressUidExtension(context.system).addressUid + context.actorOf( RARP(context.system).configureDispatcher( ProtocolStateActor.inboundProps( - HandshakeInfo( - stateActorLocalAddress, - AddressUidExtension(context.system).addressUid, - stateActorSettings.SecureCookie), + HandshakeInfo(stateActorLocalAddress, addressUid, stateActorSettings.SecureCookie), handle, stateActorAssociationHandler, stateActorSettings, @@ -178,13 +181,15 @@ private[transport] class AkkaProtocolManager( val stateActorSettings = settings val stateActorWrappedTransport = wrappedTransport val failureDetector = createTransportFailureDetector() + + // Using the 'int' addressUid rather than the 'long' is sufficient for Classic Remoting + @silent + val addressUid = AddressUidExtension(context.system).addressUid + context.actorOf( RARP(context.system).configureDispatcher( ProtocolStateActor.outboundProps( - HandshakeInfo( - stateActorLocalAddress, - AddressUidExtension(context.system).addressUid, - stateActorSettings.SecureCookie), + HandshakeInfo(stateActorLocalAddress, addressUid, stateActorSettings.SecureCookie), remoteAddress, statusPromise, stateActorWrappedTransport, diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 32bf87081d..e6a53a889b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -14,6 +14,8 @@ import akka.util.ByteString import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom +import com.github.ghik.silencer.silent + import scala.concurrent.{ Future, Promise } import scala.util.control.NoStackTrace @@ -35,6 +37,7 @@ private[remote] object FailureInjectorTransportAdapter { trait FailureInjectorCommand @SerialVersionUID(1L) + @deprecated("Not implemented", "2.5.22") final case class All(mode: GremlinMode) @SerialVersionUID(1L) final case class One(remoteAddress: Address, mode: GremlinMode) @@ -67,15 +70,14 @@ private[remote] class FailureInjectorTransportAdapter( @volatile private var upstreamListener: Option[AssociationEventListener] = None private[transport] val addressChaosTable = new ConcurrentHashMap[Address, GremlinMode]() - @volatile private var allMode: GremlinMode = PassThru override val addedSchemeIdentifier = FailureInjectorSchemeIdentifier protected def maximumOverhead = 0 override def managementCommand(cmd: Any): Future[Boolean] = cmd match { - case All(mode) => - allMode = mode - Future.successful(true) + case All(_) => + Future.failed( + new IllegalArgumentException("Setting the mode for all addresses at once is not currently implemented")) case One(address, mode) => // don't care about the protocol part - we are injected in the stack anyway! addressChaosTable.put(address.copy(protocol = "", system = ""), mode) @@ -179,6 +181,10 @@ private[remote] final case class FailureInjectorHandle( override def disassociate(reason: String, log: LoggingAdapter): Unit = wrappedHandle.disassociate(reason, log) + @deprecated( + message = "Use method that states reasons to make sure disassociation reasons are logged.", + since = "2.5.3") + @silent override def disassociate(): Unit = wrappedHandle.disassociate() diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index c253627164..a58887d0f6 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -32,6 +32,7 @@ import scala.util.control.NonFatal import akka.dispatch.sysmsg.{ Unwatch, Watch } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.remote.RARP +import com.github.ghik.silencer.silent class ThrottlerProvider extends TransportAdapterProvider { @@ -342,6 +343,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) Future.successful(SetThrottleAck) } + // silent because of use of isTerminated + @silent private def askModeWithDeathCompletion(target: ActorRef, mode: ThrottleMode)( implicit timeout: Timeout): Future[SetThrottleAck.type] = { if (target.isTerminated) Future.successful(SetThrottleAck) diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 72b30bc0be..0ce5910295 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -6,13 +6,13 @@ package akka.remote.transport import scala.concurrent.{ Future, Promise } import scala.util.control.NoStackTrace - import akka.actor.{ ActorRef, Address, NoSerializationVerificationNeeded } import akka.util.{ unused, ByteString } import akka.remote.transport.AssociationHandle.HandleEventListener import akka.AkkaException import akka.actor.DeadLetterSuppression import akka.event.LoggingAdapter +import com.github.ghik.silencer.silent object Transport { @@ -278,6 +278,7 @@ trait AssociationHandle { * be notified, but this is not guaranteed. The Transport that provides the handle MUST guarantee that disassociate() * could be called arbitrarily many times. */ + @silent def disassociate(reason: String, log: LoggingAdapter): Unit = { if (log.isDebugEnabled) log.debug( diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index 5789a97769..6647ba3dcd 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -27,7 +27,7 @@ private[netty] trait NettyHelpers { protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = () - final protected def transformException(@unused ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = { + final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = { val cause = if (ev.getCause ne null) ev.getCause else new AkkaException("Unknown cause") cause match { case _: ClosedChannelException => // Ignore diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala index cd5a536f8d..54ce9d1c7d 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala @@ -41,6 +41,7 @@ private[akka] class SSLSettings(config: Config) { * Used for adding SSL support to Netty pipeline. * The `SSLEngine` is created via the configured [[SSLEngineProvider]]. */ +@ccompatUsedUntil213 private[akka] object NettySSLSupport { /** diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 7b5e46016b..0ed37868c4 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -20,7 +20,6 @@ import scala.concurrent.blocking import scala.util.Try import scala.util.control.NoStackTrace import scala.util.control.NonFatal - import akka.actor.ActorSystem import akka.actor.Address import akka.actor.ExtendedActorSystem @@ -39,6 +38,7 @@ import akka.util.Helpers.Requiring import akka.util.OptionVal import akka.ConfigurationException import akka.OnlyCauseStackTrace +import com.github.ghik.silencer.silent import com.typesafe.config.Config import org.jboss.netty.bootstrap.Bootstrap import org.jboss.netty.bootstrap.ClientBootstrap @@ -181,6 +181,7 @@ class NettyTransportSettings(config: Config) { val PortSelector: Int = getInt("port") @deprecated("WARNING: This should only be used by professionals.", "2.4") + @silent val BindPortSelector: Int = getString("bind-port") match { case "" => PortSelector case value => value.toInt @@ -465,6 +466,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA override def getPipeline: ChannelPipeline = { val pipeline = newPipeline if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = false)) + @silent val handler = if (isDatagram) new UdpServerHandler(NettyTransport.this, associationListenerPromise.future) else new TcpServerHandler(NettyTransport.this, associationListenerPromise.future, log) @@ -478,6 +480,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA override def getPipeline: ChannelPipeline = { val pipeline = newPipeline if (EnableSsl) pipeline.addFirst("SslHandler", sslHandler(isClient = true)) + @silent val handler = if (isDatagram) new UdpClientHandler(NettyTransport.this, remoteAddress) else new TcpClientHandler(NettyTransport.this, remoteAddress, log) @@ -530,8 +533,11 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } override def listen: Future[(Address, Promise[AssociationEventListener])] = { + @silent + val bindPort = settings.BindPortSelector + for { - address <- addressToSocketAddress(Address("", "", settings.BindHostname, settings.BindPortSelector)) + address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort)) } yield { try { val newServerChannel = inboundBootstrap match { @@ -545,12 +551,15 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA serverChannel = newServerChannel + @silent + val port = if (settings.PortSelector == 0) None else Some(settings.PortSelector) + addressFromSocketAddress( newServerChannel.getLocalAddress, schemeIdentifier, system.name, Some(settings.Hostname), - if (settings.PortSelector == 0) None else Some(settings.PortSelector)) match { + port) match { case Some(address) => addressFromSocketAddress(newServerChannel.getLocalAddress, schemeIdentifier, system.name, None, None) match { case Some(address) => boundTo = address diff --git a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala index 46d6fa5403..e97fc7ca27 100644 --- a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala @@ -5,10 +5,13 @@ package akka.remote import akka.testkit.AkkaSpec + import scala.collection.immutable.TreeMap import scala.concurrent.duration._ import akka.remote.FailureDetector.Clock +import com.github.ghik.silencer.silent +@silent class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { "An AccrualFailureDetector" must { diff --git a/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala index d2d6891b19..485a3b671d 100644 --- a/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala @@ -257,7 +257,7 @@ class AckedDeliverySpec extends AkkaSpec { def happened(p: Double) = ThreadLocalRandom.current().nextDouble() < p - @tailrec def geom(p: Double, limit: Int = 5, acc: Int = 0): Int = + @tailrec def geom(p: Double, limit: Int, acc: Int = 0): Int = if (acc == limit) acc else if (happened(p)) acc else geom(p, limit, acc + 1) @@ -278,7 +278,7 @@ class AckedDeliverySpec extends AkkaSpec { def dbgLog(message: String): Unit = log :+= message - def senderSteps(steps: Int, p: Double = 1.0) = { + def senderSteps(steps: Int, p: Double) = { val resends = (sndBuf.nacked ++ sndBuf.nonAcked).take(steps) val sends = if (steps - resends.size > 0) { @@ -300,7 +300,7 @@ class AckedDeliverySpec extends AkkaSpec { } } - def receiverStep(p: Double = 1.0) = { + def receiverStep(p: Double) = { if (happened(p)) { sndBuf = sndBuf.acknowledge(lastAck) dbgLog(s"$sndBuf <-- $lastAck -- $rcvBuf") diff --git a/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala index e72c6e59c4..0301852f9b 100644 --- a/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala @@ -11,6 +11,7 @@ import akka.actor.dungeon.ChildrenContainer import akka.remote.transport.ThrottlerTransportAdapter.ForceDisassociate import akka.testkit._ import akka.testkit.TestActors.EchoActor +import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory import scala.collection.immutable @@ -30,7 +31,7 @@ object ActorsLeakSpec { | |""".stripMargin) - def collectLiveActors(root: ActorRef): immutable.Seq[ActorRef] = { + def collectLiveActors(root: Option[ActorRef]): immutable.Seq[ActorRef] = { def recurse(node: ActorRef): List[ActorRef] = { val children: List[ActorRef] = node match { @@ -38,10 +39,10 @@ object ActorsLeakSpec { val cell = wc.underlying cell.childrenRefs match { - case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason) => Nil - case x @ (ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer) => Nil - case n: ChildrenContainer.NormalChildrenContainer => cell.childrenRefs.children.toList - case x => Nil + case ChildrenContainer.TerminatingChildrenContainer(_, _, _) => Nil + case ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer => Nil + case _: ChildrenContainer.NormalChildrenContainer => cell.childrenRefs.children.toList + case _ => Nil } case _ => Nil } @@ -49,7 +50,10 @@ object ActorsLeakSpec { node :: children.flatMap(recurse) } - recurse(root) + root match { + case Some(node) => recurse(node) + case None => immutable.Seq.empty + } } class StoppableActor extends Actor { @@ -66,12 +70,12 @@ class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender "Remoting" must { "not leak actors" in { - val ref = system.actorOf(Props[EchoActor], "echo") + system.actorOf(Props[EchoActor], "echo") val echoPath = RootActorPath(RARP(system).provider.getDefaultAddress) / "user" / "echo" val targets = List("/system/endpointManager", "/system/transports").map { path => system.actorSelection(path) ! Identify(0) - expectMsgType[ActorIdentity].getRef + expectMsgType[ActorIdentity].ref } val initialActors = targets.flatMap(collectLiveActors).toSet @@ -114,8 +118,9 @@ class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender val beforeQuarantineActors = targets.flatMap(collectLiveActors).toSet // it must not quarantine the current connection - RARP(system).provider.transport - .quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1), "test") + @silent + val addressUid = AddressUidExtension(remoteSystem).addressUid + 1 + RARP(system).provider.transport.quarantine(remoteAddress, Some(addressUid), "test") // the message from local to remote should reuse passive inbound connection system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1) diff --git a/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala b/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala index d21fbad6b2..7df5a39bf6 100644 --- a/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala @@ -5,12 +5,15 @@ package akka.remote import akka.testkit._ + import scala.concurrent.duration._ import akka.actor.{ ActorSystem, Address } import akka.util.ccompat._ import com.typesafe.config.ConfigFactory + import scala.collection.JavaConverters._ +@ccompatUsedUntil213 class DaemonicSpec extends AkkaSpec { "Remoting configured with daemonic = on" must { @@ -40,7 +43,7 @@ class DaemonicSpec extends AkkaSpec { // get new non daemonic threads running awaitAssert({ val newNonDaemons: Set[Thread] = - Thread.getAllStackTraces.keySet().asScala.seq.filter(t => !origThreads(t) && !t.isDaemon).to(Set) + Thread.getAllStackTraces.keySet().asScala.filter(t => !origThreads(t) && !t.isDaemon).to(Set) newNonDaemons should ===(Set.empty[Thread]) }, 4.seconds) diff --git a/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala index a958268663..4aafbfc8ab 100644 --- a/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/DeadlineFailureDetectorSpec.scala @@ -22,7 +22,7 @@ class DeadlineFailureDetectorSpec extends AkkaSpec { } } - def createFailureDetector(acceptableLostDuration: FiniteDuration, clock: Clock = FailureDetector.defaultClock) = + def createFailureDetector(acceptableLostDuration: FiniteDuration, clock: Clock) = new DeadlineFailureDetector(acceptableLostDuration, heartbeatInterval = 1.second)(clock = clock) "mark node as monitored after a series of successful heartbeats" in { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index 4542d7e389..9cbdd0afe5 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -8,9 +8,11 @@ import akka.testkit._ import akka.actor._ import com.typesafe.config.ConfigFactory import akka.actor.RootActorPath + import scala.concurrent.duration._ import akka.testkit.SocketUtil import akka.event.Logging.Warning +import com.github.ghik.silencer.silent class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString(""" @@ -68,17 +70,19 @@ akka { expectMsg(20.seconds, ref) // we don't expect real quarantine when the UID is unknown, i.e. QuarantinedEvent is not published - probe.expectNoMsg(3.seconds) + probe.expectNoMessage(3.seconds) // The following verifies ticket #3870, i.e. make sure that re-delivery of Watch message is stopped. // It was observed as periodic logging of "address is now gated" when the gate was lifted. system.eventStream.subscribe(probe.ref, classOf[Warning]) - probe.expectNoMsg(rarp.remoteSettings.RetryGateClosedFor * 2) + probe.expectNoMessage(rarp.remoteSettings.RetryGateClosedFor * 2) } "receive Terminated when watched node is unknown host" in { val path = RootActorPath(Address(protocol, system.name, "unknownhost", 2552)) / "user" / "subject" system.actorOf(Props(new Actor { - context.watch(context.actorFor(path)) + @silent + val watchee = context.actorFor(path) + context.watch(watchee) def receive = { case t: Terminated => testActor ! t.actor.path } @@ -110,9 +114,9 @@ akka { probe.watch(extinctRef) probe.unwatch(extinctRef) - probe.expectNoMsg(5.seconds) + probe.expectNoMessage(5.seconds) system.eventStream.subscribe(probe.ref, classOf[Warning]) - probe.expectNoMsg(RARP(system).provider.remoteSettings.RetryGateClosedFor * 2) + probe.expectNoMessage(RARP(system).provider.remoteSettings.RetryGateClosedFor * 2) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeploymentWhitelistSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeploymentWhitelistSpec.scala index 9b0dbd5ab4..0196314437 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeploymentWhitelistSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeploymentWhitelistSpec.scala @@ -154,7 +154,7 @@ class RemoteDeploymentWhitelistSpec r.path.toString should ===( s"akka.test://remote-sys@localhost:12346/remote/akka.test/${getClass.getSimpleName}@localhost:12345/user/danger-mouse") r ! 42 - expectNoMsg(1.second) + expectNoMessage(1.second) system.stop(r) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala index 7fc39fccd3..1bf88bda40 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala @@ -48,7 +48,7 @@ class RemoteInitErrorSpec extends WordSpec with Matchers { ActorSystem("duplicate", ConfigFactory.parseString("akka.loglevel=OFF").withFallback(conf)) fail("initialization should fail due to invalid IP address") } catch { - case NonFatal(e) => { + case NonFatal(_) => { eventually(timeout(30 seconds), interval(800 milliseconds)) { val current = currentThreadIds() // no new threads should remain compared to the start state diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 5b53509156..a82fd5de28 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -8,6 +8,7 @@ import language.postfixOps import scala.concurrent.duration._ import akka.testkit._ import akka.actor._ +import com.github.ghik.silencer.silent object RemoteWatcherSpec { @@ -81,6 +82,7 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { val remoteSystem = ActorSystem("RemoteSystem", system.settings.config) val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + @silent def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid Seq(system, remoteSystem).foreach( @@ -103,8 +105,6 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { "A RemoteWatcher" must { "have correct interaction when watching" in { - - val fd = createFailureDetector() val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1") @@ -119,48 +119,48 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { monitorA ! Stats // (a1->b1), (a1->b2), (a2->b2) expectMsg(Stats.counts(watching = 3, watchingNodes = 1)) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA.tell(heartbeatRspB, monitorB) monitorA ! HeartbeatTick expectMsg(Heartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! UnwatchRemote(b1, a1) // still (a1->b2) and (a2->b2) left monitorA ! Stats expectMsg(Stats.counts(watching = 2, watchingNodes = 1)) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! UnwatchRemote(b2, a2) // still (a1->b2) left monitorA ! Stats expectMsg(Stats.counts(watching = 1, watchingNodes = 1)) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(Heartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! UnwatchRemote(b2, a1) // all unwatched monitorA ! Stats expectMsg(Stats.empty) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick - expectNoMsg(100 millis) + expectNoMessage(100 millis) // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } "generate AddressTerminated when missing heartbeats" taggedAs LongRunningTest in { @@ -180,7 +180,7 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA.tell(heartbeatRspB, monitorB) - expectNoMsg(1 second) + expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA.tell(heartbeatRspB, monitorB) @@ -197,7 +197,7 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { } // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } "generate AddressTerminated when missing first heartbeat" taggedAs LongRunningTest in { @@ -206,10 +206,9 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) - val fd = createFailureDetector() val heartbeatExpectedResponseAfter = 2.seconds val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5") - val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") + createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") val a = system.actorOf(Props[MyActor], "a5").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b5") @@ -233,7 +232,7 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { } // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } "generate AddressTerminated for new watch after broken connection that was re-established and broken again" taggedAs LongRunningTest in { @@ -253,7 +252,7 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA.tell(heartbeatRspB, monitorB) - expectNoMsg(1 second) + expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA.tell(heartbeatRspB, monitorB) @@ -275,7 +274,7 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { monitorA ! Stats expectMsg(Stats.empty) } - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) // assume that connection comes up again, or remote system is restarted val c = createRemoteActor(Props[MyActor], "c6") @@ -285,22 +284,22 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA.tell(heartbeatRspB, monitorB) - expectNoMsg(1 second) + expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA.tell(heartbeatRspB, monitorB) monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA ! ReapUnreachableTick - p.expectNoMsg(1 second) + p.expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA.tell(heartbeatRspB, monitorB) monitorA ! HeartbeatTick expectMsg(Heartbeat) monitorA ! ReapUnreachableTick - p.expectNoMsg(1 second) - q.expectNoMsg(1 second) + p.expectNoMessage(1 second) + q.expectNoMessage(1 second) // then stop heartbeating again, should generate new AddressTerminated within(10 seconds) { @@ -315,7 +314,7 @@ class RemoteWatcherSpec extends AkkaSpec("""akka { } // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index e2e20bc8e3..70b9d08f75 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -21,6 +21,7 @@ import scala.concurrent.duration._ import java.util.concurrent.ThreadLocalRandom import akka.testkit.SocketUtil.temporaryServerAddress +import com.github.ghik.silencer.silent object RemotingSpec { @@ -31,11 +32,15 @@ object RemotingSpec { var target: ActorRef = context.system.deadLetters def receive = { - case (p: Props, n: String) => sender() ! context.actorOf(Props[Echo1], n) + case (_: Props, n: String) => sender() ! context.actorOf(Props[Echo1], n) case ex: Exception => throw ex - case ActorForReq(s) => sender() ! context.actorFor(s) - case ActorSelReq(s) => sender() ! context.actorSelection(s) - case x => target = sender(); sender() ! x + case ActorForReq(s) => { + @silent + val actor = context.actorFor(s) + sender() ! actor + } + case ActorSelReq(s) => sender() ! context.actorSelection(s) + case x => target = sender(); sender() ! x } override def preStart(): Unit = {} @@ -129,6 +134,7 @@ object RemotingSpec { } } +@silent class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with DefaultTimeout { import RemotingSpec._ @@ -175,7 +181,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D try { bigBounceHere ! msg afterSend - expectNoMsg(500.millis.dilated) + expectNoMessage(500.millis.dilated) } finally { system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent]) system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent]) @@ -217,7 +223,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "support ask" in { Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: akka.pattern.PromiseActorRef) => // good + case ("pong", _: akka.pattern.PromiseActorRef) => // good case m => fail(m + " was not (pong, AskActorRef)") } } @@ -298,7 +304,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D echo ! PoisonPill expectMsg("postStop") echo ! 72 - expectNoMsg(1.second) + expectNoMessage(1.second) val echo2 = remoteSystem.actorOf(Props[Echo1], "otherEcho1") echo2 ! 73 @@ -306,7 +312,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D // msg to old ActorRef (different uid) should not get through echo2.path.uid should not be (echo.path.uid) echo ! 74 - expectNoMsg(1.second) + expectNoMessage(1.second) remoteSystem.actorFor("/user/otherEcho1") ! 75 expectMsg(75) @@ -325,7 +331,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val l = system.actorOf(Props(new Actor { def receive = { case (p: Props, n: String) => sender() ! context.actorOf(p, n) - case ActorForReq(s) => sender() ! context.actorFor(s) + case ActorForReq(s) => { + sender() ! context.actorFor(s) + } } }), "looker1") // child is configured to be deployed on remote-sys (remoteSystem) @@ -361,7 +369,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D // msg to old ActorRef (different uid) should not get through child2.path.uid should not be (child.path.uid) child ! 46 - expectNoMsg(1.second) + expectNoMessage(1.second) system.actorFor(system / "looker1" / "child") ! 47 expectMsg(47) } @@ -455,7 +463,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D // msg to old ActorRef (different uid) should not get through child2.path.uid should not be (child.path.uid) child ! 56 - expectNoMsg(1.second) + expectNoMessage(1.second) system.actorSelection(system / "looker2" / "child") ! 57 expectMsg(57) } @@ -518,7 +526,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D object Unserializable EventFilter[NotSerializableException](pattern = ".*No configured serialization.*", occurrences = 1).intercept { verifySend(Unserializable) { - expectNoMsg(1.second) // No AssocitionErrorEvent should be published + expectNoMessage(1.second) // No AssocitionErrorEvent should be published } } } @@ -536,7 +544,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload sent.*", occurrences = 1) .intercept { verifySend(oversized) { - expectNoMsg(1.second) // No AssocitionErrorEvent should be published + expectNoMessage(1.second) // No AssocitionErrorEvent should be published } } } @@ -546,7 +554,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload received.*", occurrences = 1) .intercept { verifySend(maxPayloadBytes + 1) { - expectNoMsg(1.second) // No AssocitionErrorEvent should be published + expectNoMessage(1.second) // No AssocitionErrorEvent should be published } } } @@ -638,7 +646,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val otherSelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere")) otherSelection.tell("ping", probe.ref) - probe.expectNoMsg(1.second) + probe.expectNoMessage(1.second) terminatedListener.lastMsg should be(null) @@ -678,9 +686,6 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D remoteTransportProbe.ref ! ev })) - val outboundHandle = - new TestAssociationHandle(rawLocalAddress, rawRemoteAddress, remoteTransport, inbound = false) - // Hijack associations through the test transport awaitCond(registry.transportsReady(rawLocalAddress, rawRemoteAddress)) val testTransport = registry.transportFor(rawLocalAddress).get._1 @@ -719,7 +724,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D inboundHandle.write(brokenPacket) // No disassociation now, the connection is still stashed - inboundHandleProbe.expectNoMsg(1.second) + inboundHandleProbe.expectNoMessage(1.second) // Finish the handshake for the outbound connection. This will unstash the inbound pending connection. remoteHandle.association.write(handshakePacket) @@ -763,9 +768,6 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D remoteTransportProbe.ref ! ev })) - val outboundHandle = - new TestAssociationHandle(rawLocalAddress, rawRemoteAddress, remoteTransport, inbound = false) - // Hijack associations through the test transport awaitCond(registry.transportsReady(rawLocalAddress, rawRemoteAddress)) val testTransport = registry.transportFor(rawLocalAddress).get._1 @@ -800,11 +802,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D inboundHandle.write(handshakePacket) // No disassociation now, the connection is still stashed - inboundHandleProbe.expectNoMsg(1.second) + inboundHandleProbe.expectNoMessage(1.second) // Quarantine unrelated connection RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1), "test") - inboundHandleProbe.expectNoMsg(1.second) + inboundHandleProbe.expectNoMessage(1.second) // Quarantine the connection RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID.toLong), "test") @@ -834,11 +836,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") otherSelection.tell("ping", probeSender) - probe.expectNoMsg(1.seconds) + probe.expectNoMessage(1.seconds) val otherSystem = ActorSystem("other-system", otherConfig) try { muteSystem(otherSystem) - probe.expectNoMsg(2.seconds) + probe.expectNoMessage(2.seconds) otherSystem.actorOf(Props[Echo2], "echo") within(5.seconds) { awaitAssert { @@ -873,11 +875,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D val otherSelection = thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") otherSelection.tell("ping", thisSender) - thisProbe.expectNoMsg(1.seconds) + thisProbe.expectNoMessage(1.seconds) val otherSystem = ActorSystem("other-system", otherConfig) try { muteSystem(otherSystem) - thisProbe.expectNoMsg(2.seconds) + thisProbe.expectNoMessage(2.seconds) val otherProbe = new TestProbe(otherSystem) val otherSender = otherProbe.ref val thisSelection = diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index ac30283604..e8b8f27ac1 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -138,9 +138,10 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) ("-") must { if (cipherConfig.runTest && preCondition) { - val ignoreMe = other.actorOf(Props(new Actor { + other.actorOf(Props(new Actor { def receive = { case ("ping", x) => sender() ! ((("pong", x), sender())) } }), "echo") + val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress @@ -149,10 +150,12 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) val bytes = Array.ofDim[Byte](16) // awaitAssert just in case we are very unlucky to get same sequence more than once awaitAssert { - val randomBytes = (1 to 10).map { n => - rng.nextBytes(bytes) - bytes.toVector - }.toSet + val randomBytes = List + .fill(10) { + rng.nextBytes(bytes) + bytes.toVector + } + .toSet randomBytes.size should ===(10) } } @@ -183,7 +186,7 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) } for (i <- 1 to 1000) here ! (("ping", i)) - for (i <- 1 to 1000) expectMsgPF() { case (("pong", i), `testActor`) => true } + for (i <- 1 to 1000) expectMsgPF() { case (("pong", `i`), `testActor`) => true } } "support ask" in within(timeout.duration) { diff --git a/akka-remote/src/test/scala/akka/remote/TransientSerializationErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/TransientSerializationErrorSpec.scala index 1a7509fe22..ef8e80cd66 100644 --- a/akka-remote/src/test/scala/akka/remote/TransientSerializationErrorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/TransientSerializationErrorSpec.scala @@ -9,6 +9,7 @@ import java.io.NotSerializableException import akka.actor.{ ActorSystem, ExtendedActorSystem, RootActorPath } import akka.serialization.SerializerWithStringManifest import akka.testkit.{ AkkaSpec, TestActors, TestKit } +import akka.util.unused import com.typesafe.config.{ Config, ConfigFactory } object TransientSerializationErrorSpec { @@ -19,7 +20,7 @@ object TransientSerializationErrorSpec { object NotDeserializable object IllegalOnDeserialize - class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + class TestSerializer(@unused system: ExtendedActorSystem) extends SerializerWithStringManifest { def identifier: Int = 666 def manifest(o: AnyRef): String = o match { case ManifestNotSerializable => throw new NotSerializableException() diff --git a/akka-remote/src/test/scala/akka/remote/artery/DuplicateHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/DuplicateHandshakeSpec.scala index 920e095b57..874f1d2037 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/DuplicateHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/DuplicateHandshakeSpec.scala @@ -4,8 +4,6 @@ package akka.remote.artery -import scala.concurrent.duration._ - import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.remote.UniqueAddress @@ -33,9 +31,7 @@ class DuplicateHandshakeSpec extends AkkaSpec with ImplicitSender { val addressA = UniqueAddress(Address("akka", "sysA", "hostA", 1001), 1) val addressB = UniqueAddress(Address("akka", "sysB", "hostB", 1002), 2) - private def setupStream( - inboundContext: InboundContext, - timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { + private def setupStream(inboundContext: InboundContext): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { TestSource .probe[AnyRef] .map { msg => diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala index c75c31d6a8..a8b6b0ceab 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala @@ -86,7 +86,7 @@ class FlightRecorderSpec extends AkkaSpec { checkLogRotated(reader.structure.hiFreqLog, List(Live, Snapshot, Snapshot, Snapshot)) } - "properly report zero low frequency events" in withFlightRecorder { (recorder, reader, channel) => + "properly report zero low frequency events" in withFlightRecorder { (_, reader, channel) => channel.force(false) reader.rereadStructure() @@ -95,7 +95,7 @@ class FlightRecorderSpec extends AkkaSpec { entries.isEmpty should be(true) } - "properly report zero high frequency events" in withFlightRecorder { (recorder, reader, channel) => + "properly report zero high frequency events" in withFlightRecorder { (_, reader, channel) => channel.force(false) reader.rereadStructure() @@ -233,7 +233,7 @@ class FlightRecorderSpec extends AkkaSpec { entries.sortBy(_.code) should ===(entries.sortBy(_.timeStamp)) } - "properly store low frequency events after snapshot" in withFlightRecorder { (recorder, reader, channel) => + "properly store low frequency events after snapshot" in withFlightRecorder { (recorder, reader, _) => val sink = recorder.createEventSink() val helloBytes = "Hello".getBytes("US-ASCII") val hello2Bytes = "Hello2".getBytes("US-ASCII") @@ -306,7 +306,7 @@ class FlightRecorderSpec extends AkkaSpec { liveEntries.sortBy(_.code) should ===(liveEntries.sortBy(_.timeStamp)) } - "properly store alerts and make a snapshot" in withFlightRecorder { (recorder, reader, channel) => + "properly store alerts and make a snapshot" in withFlightRecorder { (recorder, reader, _) => val sink = recorder.createEventSink() val helloBytes = "Hello".getBytes("US-ASCII") val alertBytes = "An alert".getBytes("US-ASCII") diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala index be72336846..b965dec478 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala @@ -8,7 +8,6 @@ import scala.concurrent.duration._ import akka.actor._ import akka.testkit.ImplicitSender -import akka.testkit.SocketUtil import akka.testkit.TestActors import com.typesafe.config.ConfigFactory diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index 87663ab617..2d333bc44c 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -47,7 +47,6 @@ class InboundControlJunctionSpec "be emitted via side channel" in { val observerProbe = TestProbe() - val inboundContext = new TestInboundContext(localAddress = addressB) val recipient = OptionVal.None // not used val ((upstream, controlSubject), downstream) = TestSource diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index e751cb147e..a094048af6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -36,9 +36,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { val addressA = UniqueAddress(Address("akka", "sysA", "hostA", 1001), 1) val addressB = UniqueAddress(Address("akka", "sysB", "hostB", 1002), 2) - private def setupStream( - inboundContext: InboundContext, - timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { + private def setupStream(inboundContext: InboundContext): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { val recipient = OptionVal.None // not used TestSource .probe[AnyRef] @@ -89,7 +87,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.request(10) // no HandshakeReq upstream.sendNext("msg17") - downstream.expectNoMsg(200.millis) // messages from unknown are dropped + downstream.expectNoMessage(200.millis) // messages from unknown are dropped // and accept messages after handshake upstream.sendNext(HandshakeReq(addressA, addressB.address)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala index a43a41d34f..6d67ebaac5 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LateConnectSpec.scala @@ -9,7 +9,6 @@ import scala.concurrent.duration._ import akka.actor.RootActorPath import akka.remote.RARP import akka.testkit.ImplicitSender -import akka.testkit.SocketUtil import akka.testkit.TestActors import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory diff --git a/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala index 15d980571e..e91ca89a7b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala @@ -6,9 +6,11 @@ package akka.remote.artery import akka.testkit.AkkaSpec import akka.util.Unsafe +import com.github.ghik.silencer.silent import scala.util.Random +@silent class LruBoundedCacheSpec extends AkkaSpec { class TestCache(_capacity: Int, threshold: Int, hashSeed: String = "") @@ -217,6 +219,7 @@ class LruBoundedCacheSpec extends AkkaSpec { // Have not seen lower than 890 stats.entries should be > 750 // Have not seen higher than 1.8 + stats.averageProbeDistance should be < 2.5 // Have not seen higher than 15 stats.maxProbeDistance should be < 25 diff --git a/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala index ada1060af1..adcc2d233a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/MetadataCarryingSpec.scala @@ -125,7 +125,7 @@ class MetadataCarryingSpec extends ArteryMultiNodeSpec(""" proxyA ! Ping() expectMsgType[Ping] - val writeA = instrumentProbeA.expectMsgType[RemoteWriteMetadata] + instrumentProbeA.expectMsgType[RemoteWriteMetadata] val sentA = instrumentProbeA.expectMsgType[RemoteMessageSent] val readB = instrumentProbeB.expectMsgType[RemoteReadMetadata] val recvdB = instrumentProbeB.expectMsgType[RemoteMessageReceived] @@ -136,7 +136,7 @@ class MetadataCarryingSpec extends ArteryMultiNodeSpec(""" recvdB.time should be > 0L // for the reply - val writeB = instrumentProbeB.expectMsgType[RemoteWriteMetadata] + instrumentProbeB.expectMsgType[RemoteWriteMetadata] val sentB = instrumentProbeB.expectMsgType[RemoteMessageSent] val readA = instrumentProbeA.expectMsgType[RemoteReadMetadata] val recvdA = instrumentProbeA.expectMsgType[RemoteMessageReceived] diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index 65367fc86d..75252e2189 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -59,7 +59,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { "send HandshakeReq when first pulled" in { val inboundContext = new TestInboundContext(localAddress = addressA) val outboundContext = inboundContext.association(addressB.address) - val (upstream, downstream) = setupStream(outboundContext) + val (_, downstream) = setupStream(outboundContext) downstream.request(10) downstream.expectNext(HandshakeReq(addressA, addressB.address)) @@ -82,7 +82,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { "timeout if handshake not completed" in { val inboundContext = new TestInboundContext(localAddress = addressA) val outboundContext = inboundContext.association(addressB.address) - val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis) + val (_, downstream) = setupStream(outboundContext, timeout = 200.millis) downstream.request(1) downstream.expectNext(HandshakeReq(addressA, addressB.address)) @@ -92,7 +92,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { "retry HandshakeReq" in { val inboundContext = new TestInboundContext(localAddress = addressA) val outboundContext = inboundContext.association(addressB.address) - val (upstream, downstream) = setupStream(outboundContext, retryInterval = 100.millis) + val (_, downstream) = setupStream(outboundContext, retryInterval = 100.millis) downstream.request(10) downstream.expectNext(HandshakeReq(addressA, addressB.address)) @@ -109,7 +109,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.request(10) downstream.expectNext(HandshakeReq(addressA, addressB.address)) upstream.sendNext("msg1") - downstream.expectNoMsg(200.millis) + downstream.expectNoMessage(200.millis) // InboundHandshake stage will complete the handshake when receiving HandshakeRsp inboundContext.completeHandshake(addressB) downstream.expectNext("msg1") @@ -129,7 +129,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { inboundContext.completeHandshake(addressB) downstream.expectNext("msg1") - downstream.expectNoMsg(600.millis) + downstream.expectNoMessage(600.millis) upstream.sendNext("msg2") upstream.sendNext("msg3") upstream.sendNext("msg4") @@ -137,7 +137,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.expectNext("msg2") downstream.expectNext("msg3") downstream.expectNext("msg4") - downstream.expectNoMsg(600.millis) + downstream.expectNoMessage(600.millis) downstream.cancel() } @@ -145,7 +145,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { "send HandshakeReq for liveness probing" in { val inboundContext = new TestInboundContext(localAddress = addressA) val outboundContext = inboundContext.association(addressB.address) - val (upstream, downstream) = setupStream(outboundContext, livenessProbeInterval = 200.millis) + val (_, downstream) = setupStream(outboundContext, livenessProbeInterval = 200.millis) downstream.request(10) // this is from the initial diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala index 94c8b85853..83dd342bc3 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala @@ -49,7 +49,7 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" "Outbound streams" should { - "be stopped when they are idle" in withAssociation { (_, remoteAddress, remoteEcho, localArtery, localProbe) => + "be stopped when they are idle" in withAssociation { (_, remoteAddress, _, localArtery, _) => val association = localArtery.association(remoteAddress) withClue("When initiating a connection, both the control and ordinary streams are opened") { assertStreamActive(association, Association.ControlQueueIndex, expected = true) @@ -84,51 +84,49 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" } } - "eliminate quarantined association when not used" in withAssociation { - (_, remoteAddress, remoteEcho, localArtery, localProbe) => - val association = localArtery.association(remoteAddress) - withClue("When initiating a connection, both the control and ordinary streams are opened") { - assertStreamActive(association, Association.ControlQueueIndex, expected = true) - assertStreamActive(association, Association.OrdinaryQueueIndex, expected = true) - } + "eliminate quarantined association when not used" in withAssociation { (_, remoteAddress, _, localArtery, _) => + val association = localArtery.association(remoteAddress) + withClue("When initiating a connection, both the control and ordinary streams are opened") { + assertStreamActive(association, Association.ControlQueueIndex, expected = true) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = true) + } - val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid + val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid - localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") - eventually { - assertStreamActive(association, Association.ControlQueueIndex, expected = false) - assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) - } + eventually { + assertStreamActive(association, Association.ControlQueueIndex, expected = false) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) + } - // the outbound streams are inactive and association quarantined, then it's completely removed - eventually { - localArtery.remoteAddresses should not contain remoteAddress - } + // the outbound streams are inactive and association quarantined, then it's completely removed + eventually { + localArtery.remoteAddresses should not contain remoteAddress + } } - "remove inbound compression after quarantine" in withAssociation { - (_, remoteAddress, remoteEcho, localArtery, localProbe) => - val association = localArtery.association(remoteAddress) - val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid + "remove inbound compression after quarantine" in withAssociation { (_, remoteAddress, _, localArtery, _) => + val association = localArtery.association(remoteAddress) + val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid - localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid) + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid) - eventually { - assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) - } - // compression still exists when idle - localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid) + eventually { + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) + } + // compression still exists when idle + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid) - localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") - // after quarantine it should be removed - eventually { - localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should not contain remoteUid - } + localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + // after quarantine it should be removed + eventually { + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should not contain remoteUid + } } "remove inbound compression after restart with same host:port" in withAssociation { - (remoteSystem, remoteAddress, remoteEcho, localArtery, localProbe) => + (remoteSystem, remoteAddress, _, localArtery, localProbe) => val association = localArtery.association(remoteAddress) val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorForSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorForSpec.scala index fad4d0435f..656d4f16e6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorForSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorForSpec.scala @@ -10,6 +10,7 @@ import akka.remote.RemoteActorRef import akka.remote.RemotingSpec.ActorForReq import akka.testkit.{ EventFilter, _ } import akka.util.Timeout +import com.github.ghik.silencer.silent import scala.concurrent.duration._ @@ -17,6 +18,7 @@ object RemoteActorForSpec { final case class ActorForReq(s: String) extends JavaSerializable } +@silent class RemoteActorForSpec extends ArteryMultiNodeSpec("akka.loglevel=INFO") with ImplicitSender with DefaultTimeout { val remoteSystem = newRemoteSystem() @@ -94,7 +96,7 @@ class RemoteActorForSpec extends ArteryMultiNodeSpec("akka.loglevel=INFO") with // msg to old ActorRef (different uid) should not get through child2.path.uid should not be (child.path.uid) child ! 46 - expectNoMsg(1.second) + expectNoMessage(1.second) system.actorFor(system / "looker1" / "child") ! 47 expectMsg(47) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala index 8e7205411e..8980306808 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala @@ -8,10 +8,12 @@ import akka.testkit._ import akka.actor._ import com.typesafe.config.ConfigFactory import akka.actor.RootActorPath + import scala.concurrent.duration._ import akka.testkit.SocketUtil import akka.remote.QuarantinedEvent import akka.remote.RARP +import com.github.ghik.silencer.silent object RemoteDeathWatchSpec { val otherPort = ArteryMultiNodeSpec.freePort(ConfigFactory.load()) @@ -79,7 +81,9 @@ class RemoteDeathWatchSpec "receive Terminated when watched node is unknown host" in { val path = RootActorPath(Address("akka", system.name, "unknownhost", 2552)) / "user" / "subject" system.actorOf(Props(new Actor { - context.watch(context.actorFor(path)) + @silent + val watchee = context.actorFor(path) + context.watch(watchee) def receive = { case t: Terminated => testActor ! t.actor.path } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala index 2add7ffc93..e6b493db1d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala @@ -34,7 +34,7 @@ class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender { system.actorSelection(rootActorPath(sys) / "user" / "echo") } - val echo = system.actorOf(TestActors.echoActorProps, name = "echo") + system.actorOf(TestActors.echoActorProps, name = "echo") val localSelection = system.actorSelection(rootActorPath(system) / "user" / "echo") val n = 100 diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSerializationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSerializationSpec.scala index 5279ca120f..85c2ab7936 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSerializationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSerializationSpec.scala @@ -8,9 +8,10 @@ import akka.actor.{ ActorRef, ActorSystem, ExtendedActorSystem, InternalActorRef import akka.event._ import akka.testkit.TestEvent.Mute import akka.testkit.{ AkkaSpec, EventFilter, TestProbe } -import akka.util.OptionVal +import akka.util.{ unused, OptionVal } import java.nio.{ ByteBuffer, CharBuffer } import java.nio.charset.Charset + import scala.concurrent.duration._ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG") { @@ -42,7 +43,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG val ri = remoteInstruments(testInstrument(1, "!")) serializeDeserialize(ri, ri, p.ref, "foo") p.expectMsgAllOf("foo-1-!") - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) } "serialize and deserialize multiple remote instruments in the correct order" in { @@ -50,7 +51,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG val ri = remoteInstruments(testInstrument(1, "!"), testInstrument(31, "???"), testInstrument(10, "..")) serializeDeserialize(ri, ri, p.ref, "bar") p.expectMsgAllOf("bar-1-!", "bar-10-..", "bar-31-???") - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) } "skip exitsing remote instruments not in the message" in { @@ -61,7 +62,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG val riD = remoteInstruments(instruments: _*) serializeDeserialize(riS, riD, p.ref, "baz") p.expectMsgAllOf("baz-7-!", "baz-21-???") - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) } } @@ -73,7 +74,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG val riD = remoteInstruments(instruments(0), instruments(2)) serializeDeserialize(riS, riD, p.ref, "buz") p.expectMsgAllOf("buz-6-!", "buz-19-???") - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) } } @@ -84,7 +85,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG val riS = remoteInstruments(instruments: _*) val riD = remoteInstruments() serializeDeserialize(riS, riD, p.ref, "boz") - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) } } @@ -98,7 +99,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG val ri = remoteInstruments(instruments: _*) serializeDeserialize(ri, ri, p.ref, "woot") p.expectMsgAllOf("woot-10-..", "woot-21-???") - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) } } @@ -114,7 +115,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG val ri = remoteInstruments(instruments: _*) serializeDeserialize(ri, ri, p.ref, "waat") p.expectMsgAllOf("waat-10-..") - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) } } } @@ -122,7 +123,7 @@ class RemoteInstrumentsSerializationSpec extends AkkaSpec("akka.loglevel = DEBUG object RemoteInstrumentsSerializationSpec { - class Filter(settings: ActorSystem.Settings, stream: EventStream) extends LoggingFilter { + class Filter(@unused settings: ActorSystem.Settings, stream: EventStream) extends LoggingFilter { stream.publish(Mute(EventFilter.debug())) override def isErrorEnabled(logClass: Class[_], logSource: String): Boolean = true diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSpec.scala index a5b6b525de..f89b00d1d6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteInstrumentsSpec.scala @@ -5,7 +5,7 @@ package akka.remote.artery import org.scalacheck.{ Arbitrary, Gen } -import org.scalatest.prop.Checkers +import org.scalatestplus.scalacheck.Checkers import org.scalatest.{ Matchers, WordSpec } class RemoteInstrumentsSpec extends WordSpec with Matchers with Checkers { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala index c4d7a9c8fc..5d5bd063de 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteMessageSerializationSpec.scala @@ -11,6 +11,7 @@ import akka.actor.{ Actor, ActorRef, PoisonPill, Props } import akka.remote.{ AssociationErrorEvent, DisassociatedEvent, OversizedPayloadException, RARP } import akka.testkit.{ EventFilter, ImplicitSender, TestActors } import akka.util.ByteString +import com.github.ghik.silencer.silent import scala.concurrent.duration._ @@ -39,7 +40,7 @@ class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" object Unserializable EventFilter[NotSerializableException](pattern = ".*No configured serialization.*", occurrences = 1).intercept { verifySend(Unserializable) { - expectNoMsg(1.second) // No AssocitionErrorEvent should be published + expectNoMessage(1.second) // No AssocitionErrorEvent should be published } } } @@ -57,7 +58,7 @@ class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" EventFilter[OversizedPayloadException](start = "Failed to serialize oversized message", occurrences = 1) .intercept { verifySend(oversized) { - expectNoMsg(1.second) // No AssocitionErrorEvent should be published + expectNoMessage(1.second) // No AssocitionErrorEvent should be published } } } @@ -68,7 +69,7 @@ class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" EventFilter[OversizedPayloadException](pattern = ".*Discarding oversized payload received.*", occurrences = 1) .intercept { verifySend(maxPayloadBytes + 1) { - expectNoMsg(1.second) // No AssocitionErrorEvent should be published + expectNoMessage(1.second) // No AssocitionErrorEvent should be published } } } @@ -93,6 +94,7 @@ class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" case x => sender() ! x } }), bigBounceId) + @silent val bigBounceHere = localSystem.actorFor(s"akka://${remoteSystem.name}@localhost:$remotePort/user/$bigBounceId") val eventForwarder = localSystem.actorOf(Props(new Actor { @@ -105,7 +107,7 @@ class RemoteMessageSerializationSpec extends ArteryMultiNodeSpec(""" try { bigBounceHere ! msg afterSend - expectNoMsg(500.millis) + expectNoMessage(500.millis) } finally { localSystem.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent]) localSystem.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent]) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 4a438a6545..519cb585a4 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -62,7 +62,7 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) "Artery" must { "be able to identify a remote actor and ping it" in { - val actorOnSystemB = systemB.actorOf(Props(new Actor { + systemB.actorOf(Props(new Actor { def receive = { case "ping" => sender() ! "pong" } @@ -100,7 +100,7 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) val probe = TestProbe()(systemB) probe.watch(echo) probe.expectTerminated(echo) - expectNoMsg(1.second) + expectNoMessage(1.second) val echo2 = systemB.actorOf(TestActors.echoActorProps, "otherEcho1") echo2 ! 73 @@ -108,7 +108,7 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) // msg to old ActorRef (different uid) should not get through echo2.path.uid should not be (echo.path.uid) echo ! 74 - expectNoMsg(1.second) + expectNoMessage(1.second) } "be able to send messages concurrently preserving order" in { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala index 792f4acc32..70ef93b107 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala @@ -97,8 +97,6 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon "A RemoteWatcher" must { "have correct interaction when watching" in { - - val fd = createFailureDetector() val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1") @@ -113,48 +111,48 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon monitorA ! Stats // (a1->b1), (a1->b2), (a2->b2) expectMsg(Stats.counts(watching = 3, watchingNodes = 1)) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA.tell(heartbeatRspB, monitorB) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! UnwatchRemote(b1, a1) // still (a1->b2) and (a2->b2) left monitorA ! Stats expectMsg(Stats.counts(watching = 2, watchingNodes = 1)) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! UnwatchRemote(b2, a2) // still (a1->b2) left monitorA ! Stats expectMsg(Stats.counts(watching = 1, watchingNodes = 1)) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! UnwatchRemote(b2, a1) // all unwatched monitorA ! Stats expectMsg(Stats.empty) - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick - expectNoMsg(100 millis) + expectNoMessage(100 millis) monitorA ! HeartbeatTick - expectNoMsg(100 millis) + expectNoMessage(100 millis) // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } "generate AddressTerminated when missing heartbeats" taggedAs LongRunningTest in { @@ -174,7 +172,7 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA.tell(heartbeatRspB, monitorB) - expectNoMsg(1 second) + expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA.tell(heartbeatRspB, monitorB) @@ -191,7 +189,7 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon } // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } "generate AddressTerminated when missing first heartbeat" taggedAs LongRunningTest in { @@ -200,10 +198,9 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) - val fd = createFailureDetector() val heartbeatExpectedResponseAfter = 2.seconds val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5") - val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") + createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") val a = system.actorOf(Props[MyActor], "a5").asInstanceOf[InternalActorRef] val b = createRemoteActor(Props[MyActor], "b5") @@ -227,7 +224,7 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon } // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } "generate AddressTerminated for new watch after broken connection that was re-established and broken again" taggedAs LongRunningTest in { @@ -247,7 +244,7 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA.tell(heartbeatRspB, monitorB) - expectNoMsg(1 second) + expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA.tell(heartbeatRspB, monitorB) @@ -269,7 +266,7 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon monitorA ! Stats expectMsg(Stats.empty) } - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) // assume that connection comes up again, or remote system is restarted val c = createRemoteActor(Props[MyActor], "c6") @@ -279,22 +276,22 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA.tell(heartbeatRspB, monitorB) - expectNoMsg(1 second) + expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA.tell(heartbeatRspB, monitorB) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA ! ReapUnreachableTick - p.expectNoMsg(1 second) + p.expectNoMessage(1 second) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA.tell(heartbeatRspB, monitorB) monitorA ! HeartbeatTick expectMsg(ArteryHeartbeat) monitorA ! ReapUnreachableTick - p.expectNoMsg(1 second) - q.expectNoMsg(1 second) + p.expectNoMessage(1 second) + q.expectNoMessage(1 second) // then stop heartbeating again, should generate new AddressTerminated within(10 seconds) { @@ -309,7 +306,7 @@ class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultCon } // make sure nothing floods over to next test - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala index 8b7612730e..776de1e4b2 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala @@ -91,7 +91,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with Source.fromGraph(new SendQueue[String](sendToDeadLetters)).toMat(TestSink.probe)(Keep.both).run() downstream.request(10) - downstream.expectNoMsg(200.millis) + downstream.expectNoMessage(200.millis) sendQueue.inject(queue) downstream.expectNext("a") downstream.expectNext("b") @@ -163,7 +163,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with def test(f: (Queue[String], SendQueue.QueueValue[String], TestSubscriber.Probe[String]) => Unit): Unit = { - (1 to 100).foreach { n => + (1 to 100).foreach { _ => val queue = createQueue[String](16) val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](sendToDeadLetters)).toMat(TestSink.probe)(Keep.both).run() diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala index 654e30e7eb..b05c48aec8 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala @@ -4,8 +4,6 @@ package akka.remote.artery -import scala.concurrent.duration._ - import akka.actor.Address import akka.remote.UniqueAddress import akka.remote.artery.SystemMessageDelivery._ @@ -30,9 +28,7 @@ class SystemMessageAckerSpec extends AkkaSpec with ImplicitSender { val addressB = UniqueAddress(Address("akka", "sysB", "hostB", 1002), 2) val addressC = UniqueAddress(Address("akka", "sysC", "hostB", 1003), 3) - private def setupStream( - inboundContext: InboundContext, - timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { + private def setupStream(inboundContext: InboundContext): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { val recipient = OptionVal.None // not used TestSource .probe[AnyRef] diff --git a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala index a06c0d5ba3..c31e6eb579 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/UntrustedSpec.scala @@ -131,13 +131,13 @@ class UntrustedSpec extends ArteryMultiNodeSpec(UntrustedSpec.config) with Impli receptionist ! StopChild("child2") expectMsg("child2 stopped") // no Terminated msg, since watch was discarded - expectNoMsg(1.second) + expectNoMessage(1.second) } "discard actor selection" in { val sel = client.actorSelection(RootActorPath(address) / testActor.path.elements) sel ! "hello" - expectNoMsg(1.second) + expectNoMessage(1.second) } "discard actor selection with non root anchor" in { @@ -147,25 +147,25 @@ class UntrustedSpec extends ArteryMultiNodeSpec(UntrustedSpec.config) with Impli val sel = ActorSelection(clientReceptionistRef, receptionist.path.toStringWithoutAddress) sel ! "hello" - expectNoMsg(1.second) + expectNoMessage(1.second) } "discard actor selection to child of matching white list" in { val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements / "child1") sel ! "hello" - expectNoMsg(1.second) + expectNoMessage(1.second) } "discard actor selection with wildcard" in { val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements / "*") sel ! "hello" - expectNoMsg(1.second) + expectNoMessage(1.second) } "discard actor selection containing harmful message" in { val sel = client.actorSelection(RootActorPath(address) / receptionist.path.elements) sel ! PoisonPill - expectNoMsg(1.second) + expectNoMessage(1.second) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala index 58a340970c..7b5644fda6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala @@ -70,7 +70,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender { val payload = new Array[Byte](100000) val done = Source(1 to 1000) .map(_ => payload) - .map { n => + .map { _ => val envelope = pool.acquire() envelope.byteBuffer.put(payload) envelope.byteBuffer.flip() diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index b889043a5b..f5552e8ef4 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -68,7 +68,7 @@ class CompressionIntegrationSpec // cause TestMessage manifest to become a heavy hitter // cause echo to become a heavy hitter - (1 to messagesToExchange).foreach { i => + (1 to messagesToExchange).foreach { _ => echoRefA ! TestMessage("hello") } receiveN(messagesToExchange) // the replies @@ -224,7 +224,7 @@ class CompressionIntegrationSpec val echoRefA = expectMsgType[ActorIdentity].ref.get // cause TestMessage manifest to become a heavy hitter - (1 to messagesToExchange).foreach { i => + (1 to messagesToExchange).foreach { _ => echoRefA ! TestMessage("hello") } receiveN(messagesToExchange) // the replies @@ -280,7 +280,7 @@ class CompressionIntegrationSpec classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable]) def createAndIdentify(i: Int) = { - val echoWrap = systemWrap.actorOf(TestActors.echoActorProps, s"echo_$i") + systemWrap.actorOf(TestActors.echoActorProps, s"echo_$i") system.actorSelection(rootActorPath(systemWrap) / "user" / s"echo_$i") ! Identify(None) expectMsgType[ActorIdentity].ref.get } @@ -304,7 +304,7 @@ class CompressionIntegrationSpec allRefs ::= echoWrap // cause echo to become a heavy hitter - (1 to messagesToExchange).foreach { i => + (1 to messagesToExchange).foreach { _ => echoWrap ! TestMessage("hello") } receiveN(messagesToExchange) // the replies diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala index fa771584b8..fb144cfb23 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -50,7 +50,10 @@ class HandshakeShouldDropCompressionTableSpec "Outgoing compression table" must { "be dropped on system restart" in { val messagesToExchange = 10 - val systemATransport = RARP(system).provider.transport.asInstanceOf[ArteryTransport] + + // System A transport: + RARP(system).provider.transport.asInstanceOf[ArteryTransport] + def systemBTransport = RARP(systemB).provider.transport.asInstanceOf[ArteryTransport] // listen for compression table events @@ -126,7 +129,7 @@ class HandshakeShouldDropCompressionTableSpec def waitForEcho(probe: TestKit, m: String, max: Duration = 3.seconds): Any = probe.fishForMessage(max = max, hint = s"waiting for '$m'") { case `m` => true - case x => false + case _ => false } def identify(_system: String, port: Int, name: String) = { diff --git a/akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala index 6101e1f142..7b369cce26 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/tcp/TlsTcpSpec.scala @@ -128,10 +128,12 @@ abstract class TlsTcpSpec(config: Config) // https://doc.akka.io/docs/akka/current/security/2018-08-29-aes-rng.html // awaitAssert just in case we are very unlucky to get same sequence more than once awaitAssert { - val randomBytes = (1 to 10).map { n => - rng.nextBytes(bytes) - bytes.toVector - }.toSet + val randomBytes = List + .fill(10) { + rng.nextBytes(bytes) + bytes.toVector + } + .toSet randomBytes.size should ===(10) } } diff --git a/akka-remote/src/test/scala/akka/remote/serialization/AllowJavaSerializationOffSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/AllowJavaSerializationOffSpec.scala index 3787f6893b..f711f4606d 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/AllowJavaSerializationOffSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/AllowJavaSerializationOffSpec.scala @@ -125,7 +125,7 @@ class AllowJavaSerializationOffSpec p.ref ! new ProgrammaticJavaDummy SerializationExtension(system).findSerializerFor(new ProgrammaticJavaDummy).toBinary(new ProgrammaticJavaDummy) // should not receive this one, it would have been java serialization! - p.expectNoMsg(100.millis) + p.expectNoMessage(100.millis) p.ref ! new ProgrammaticDummy p.expectMsgType[ProgrammaticDummy] diff --git a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala index bdca458d31..ad03cfc849 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala @@ -6,23 +6,13 @@ package akka.remote.serialization import akka.actor.ActorSystem import akka.testkit.TestKit -import akka.actor.{ - Actor, - ActorRef, - Address, - Deploy, - ExtendedActorSystem, - OneForOneStrategy, - Props, - SupervisorStrategy -} +import akka.actor.{ Actor, ActorRef, Address, Deploy, ExtendedActorSystem, Props, SupervisorStrategy } import akka.remote.{ DaemonMsgCreate, RemoteScope } import akka.routing.{ FromConfig, RoundRobinPool } import akka.serialization.{ Serialization, SerializationExtension } import akka.testkit.AkkaSpec +import akka.util.unused import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import scala.language.postfixOps object DaemonMsgCreateSerializerSpec { @@ -30,14 +20,15 @@ object DaemonMsgCreateSerializerSpec { def receive = Actor.emptyBehavior } class MyActor extends EmptyActor - class MyActorWithParam(ignore: String) extends EmptyActor - class MyActorWithFunParam(fun: Function1[Int, Int]) extends EmptyActor - class ActorWithDummyParameter(javaSerialized: DummyParameter, protoSerialized: ActorRef) extends EmptyActor + class MyActorWithParam(@unused ignore: String) extends EmptyActor + class MyActorWithFunParam(@unused fun: Function1[Int, Int]) extends EmptyActor + class ActorWithDummyParameter(@unused javaSerialized: DummyParameter, @unused protoSerialized: ActorRef) + extends EmptyActor } case class DummyParameter(val inner: String) extends Serializable -trait SerializationVerification { self: AkkaSpec => +private[akka] trait SerializationVerification { self: AkkaSpec => def ser: Serialization @@ -205,11 +196,6 @@ class DaemonMsgCreateSerializerNoJavaSerializationSpec extends AkkaSpec(""" "serialize and de-serialize DaemonMsgCreate with Deploy and RouterConfig" in { verifySerialization { - // Duration.Inf doesn't equal Duration.Inf, so we use another for test - val supervisorStrategy = OneForOneStrategy(3, 10 seconds) { - case _ => SupervisorStrategy.Escalate - } - val deploy1 = Deploy( path = "path1", config = ConfigFactory.parseString("a=1"), diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala index b1f507bdd6..5df8355128 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala @@ -21,9 +21,9 @@ import akka.routing._ object MiscMessageSerializerSpec { val serializationTestOverrides = - """ + s""" akka.actor { - serialization-bindings = { "akka.remote.serialization.MiscMessageSerializerSpec$TestException" = akka-misc } ${akka.actor.java-serialization-disabled-additional-serialization-bindings} + serialization-bindings = { "akka.remote.serialization.MiscMessageSerializerSpec$$TestException" = akka-misc } $${akka.actor.java-serialization-disabled-additional-serialization-bindings} } """ diff --git a/akka-remote/src/test/scala/akka/remote/serialization/SystemMessageSerializationSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/SystemMessageSerializationSpec.scala index 87a332d4e8..3e330fd32a 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/SystemMessageSerializationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/SystemMessageSerializationSpec.scala @@ -17,7 +17,7 @@ object SystemMessageSerializationSpec { val testConfig = ConfigFactory.parseString(serializationTestOverrides).withFallback(AkkaSpec.testConf) - class TestException(msg: String) extends RuntimeException { + class TestException(msg: String) extends RuntimeException(msg) { override def equals(other: Any): Boolean = other match { case e: TestException => e.getMessage == getMessage case _ => false diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 3f47e373a8..851159d155 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -205,8 +205,8 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = remote """) wit reader ! testAssociate(uid = 33, cookie = None) awaitCond(registry.logSnapshot.exists { - case DisassociateAttempt(requester, remote) => true - case _ => false + case DisassociateAttempt(_, _) => true + case _ => false }) } @@ -264,8 +264,8 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = remote """) wit reader ! testAssociate(uid = 33, Some("xyzzy")) awaitCond(registry.logSnapshot.exists { - case DisassociateAttempt(requester, remote) => true - case _ => false + case DisassociateAttempt(_, _) => true + case _ => false }) } @@ -472,7 +472,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = remote """) wit } "give up outbound after connection timeout" in { - val (failureDetector, registry, transport, handle) = collaborators + val (failureDetector, _, transport, handle) = collaborators handle.writable = false // nothing will be written transport.associateBehavior.pushConstant(handle) @@ -499,7 +499,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = remote """) wit } "give up inbound after connection timeout" in { - val (failureDetector, registry, _, handle) = collaborators + val (failureDetector, _, _, handle) = collaborators val conf2 = ConfigFactory.parseString("akka.remote.netty.tcp.connection-timeout = 500 ms").withFallback(conf) diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index f8a1aa1ec8..703f458630 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -100,8 +100,7 @@ class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val rootB = RootActorPath(addressB) val here = { - val path = - system.actorSelection(rootB / "user" / "echo") ! Identify(None) + system.actorSelection(rootB / "user" / "echo") ! Identify(None) expectMsgType[ActorIdentity].ref.get } @@ -111,7 +110,7 @@ class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with systemB.eventStream.publish(TestEvent.Mute(DeadLettersFilter[Any])) Await.result(RARP(system).provider.transport.managementCommand(One(addressB, Drop(0.1, 0.1))), 3.seconds.dilated) - val tester = system.actorOf(Props(classOf[SequenceVerifier], here, self)) ! "start" + system.actorOf(Props(classOf[SequenceVerifier], here, self)) ! "start" expectMsgPF(60.seconds) { case (received: Int, lost: Int) => @@ -123,7 +122,7 @@ class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with override def beforeTermination(): Unit = { system.eventStream.publish( TestEvent.Mute( - EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"), + EventFilter.warning(source = s"akka://AkkaProtocolStressTest/user/$$a", start = "received dead letter"), EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) systemB.eventStream.publish( TestEvent.Mute( diff --git a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala index f640f2441e..07a784d5ac 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala @@ -155,7 +155,7 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) awaitCond(registry.existsAssociation(addressATest, addressBTest)) - handleA.disassociate() + handleA.disassociate("Test disassociation", log) expectMsgPF(timeout.duration) { case Disassociated(_) => diff --git a/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala index 1b4173cac0..0c4f82dc0a 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala @@ -98,7 +98,7 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout { "log calls and parameters" in { val logPromise = Promise[Int]() - val behavior = new SwitchableLoggedBehavior[Int, Int]((i) => Future.successful(3), (i) => logPromise.success(i)) + val behavior = new SwitchableLoggedBehavior[Int, Int](_ => Future.successful(3), i => logPromise.success(i)) behavior(11) Await.result(logPromise.future, timeout.duration) should ===(11) diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index ad69d4822c..1f9352c977 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -168,7 +168,6 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String) systemB.actorOf(Props(classOf[SystemMessageSender], msgCount, burstSize, burstDelay, targetForB)) systemA.actorOf(Props(classOf[SystemMessageSender], msgCount, burstSize, burstDelay, targetForA)) - val toSend = (0 until msgCount).toList var maxDelay = 0L for (m <- 0 until msgCount) { diff --git a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala index 54561be75f..4e4081a553 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala @@ -56,7 +56,7 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender "fail to associate with nonexisting address" in { val registry = new AssociationRegistry - var transportA = new TestTransport(addressA, registry) + val transportA = new TestTransport(addressA, registry) Await.result(transportA.listen, timeout.duration)._2.success(ActorAssociationEventListener(self)) @@ -127,7 +127,7 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender awaitCond(registry.existsAssociation(addressA, addressB)) - handleA.disassociate() + handleA.disassociate("Test disassociation", log) expectMsgPF(timeout.duration) { case Disassociated(_) => diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index 7fa61b2c19..fa62443c0a 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -94,7 +94,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende "ThrottlerTransportAdapter" must { "maintain average message rate" taggedAs TimingTest in { throttle(Direction.Send, TokenBucket(200, 500, 0, 0)) should ===(true) - val tester = system.actorOf(Props(classOf[ThrottlingTester], here, self)) ! "start" + system.actorOf(Props(classOf[ThrottlingTester], here, self)) ! "start" val time = NANOSECONDS.toSeconds(expectMsgType[Long]((TotalTime + 3).seconds)) log.warning("Total time of transmission: " + time) @@ -112,9 +112,9 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende throttle(Direction.Both, Blackhole) should ===(true) here ! Lost("Blackhole 2") - expectNoMsg(1.seconds) + expectNoMessage(1.seconds) disassociate() should ===(true) - expectNoMsg(1.seconds) + expectNoMessage(1.seconds) throttle(Direction.Both, Unthrottled) should ===(true) @@ -142,7 +142,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende override def beforeTermination(): Unit = { system.eventStream.publish( TestEvent.Mute( - EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"), + EventFilter.warning(source = s"akka://AkkaProtocolStressTest/user/$$a", start = "received dead letter"), EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) systemB.eventStream.publish( TestEvent.Mute( diff --git a/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala index 1d36e81a70..bdaa2acf31 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala @@ -33,7 +33,7 @@ object NettyTransportSpec { } implicit class RichAkkaAddress(address: Address) { - def withProtocol(protocol: String)(implicit system: ActorSystem) = + def withProtocol(protocol: String) = address.copy(protocol = protocol) } } diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 6539e60a1a..d96e2c9cda 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -30,7 +30,6 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "akka-bench-jmh-typed", "akka-multi-node-testkit", "akka-persistence-tck", - "akka-remote", "akka-stream-tests", "akka-stream-tests-tck")