Fix compiler warnings in akka-remote, akka-actor-typed. (#26139)

Fix compiler warnings in akka-remote, akka-actor-typed. (#26139)
This commit is contained in:
Helena Edelson 2018-12-20 09:10:54 -08:00 committed by GitHub
parent 15e2527629
commit 408fab3313
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 144 additions and 151 deletions

View file

@ -7,6 +7,7 @@ package akka.actor.testkit.typed
import akka.actor.typed.{ ActorRef, Behavior, Props } import akka.actor.typed.{ ActorRef, Behavior, Props }
import akka.annotation.{ DoNotInherit, InternalApi } import akka.annotation.{ DoNotInherit, InternalApi }
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
import akka.util.unused
import scala.compat.java8.FunctionConverters._ import scala.compat.java8.FunctionConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
@ -114,18 +115,18 @@ object Effect {
private[akka] final class SpawnedAnonymousAdapter[T](val ref: ActorRef[T]) private[akka] final class SpawnedAnonymousAdapter[T](val ref: ActorRef[T])
extends Effect with Product with Serializable { extends Effect with Product with Serializable {
override def equals(other: Any) = other match { override def equals(other: Any): Boolean = other match {
case _: SpawnedAnonymousAdapter[_] true case _: SpawnedAnonymousAdapter[_] true
case _ false case _ false
} }
override def hashCode: Int = Nil.## override def hashCode: Int = Nil.##
override def toString: String = "SpawnedAnonymousAdapter" override def toString: String = "SpawnedAnonymousAdapter"
override def productPrefix = "SpawnedAnonymousAdapter" override def productPrefix: String = "SpawnedAnonymousAdapter"
override def productIterator = Iterator.empty override def productIterator: Iterator[_] = Iterator.empty
override def productArity = 0 override def productArity: Int = 0
override def productElement(n: Int) = throw new NoSuchElementException override def productElement(n: Int) = throw new NoSuchElementException
override def canEqual(o: Any) = o.isInstanceOf[SpawnedAnonymousAdapter[_]] override def canEqual(o: Any): Boolean = o.isInstanceOf[SpawnedAnonymousAdapter[_]]
} }
/** /**
@ -134,7 +135,7 @@ object Effect {
@InternalApi @InternalApi
private[akka] object SpawnedAnonymousAdapter { private[akka] object SpawnedAnonymousAdapter {
def apply[T]() = new SpawnedAnonymousAdapter[T](null) def apply[T]() = new SpawnedAnonymousAdapter[T](null)
def unapply[T](s: SpawnedAnonymousAdapter[T]): Boolean = true def unapply[T](@unused s: SpawnedAnonymousAdapter[T]): Boolean = true
} }
/** /**

View file

@ -10,16 +10,17 @@ import java.util.{ List ⇒ JList }
import akka.actor.typed.{ ActorRef, ActorSystem } import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.actor.testkit.typed.internal.TestProbeImpl import akka.actor.testkit.typed.internal.TestProbeImpl
import akka.actor.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.actor.testkit.typed.{ FishingOutcome, TestKitSettings }
import akka.actor.testkit.typed.scaladsl.TestDuration import akka.actor.testkit.typed.scaladsl.TestDuration
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
import akka.util.unused
import scala.collection.immutable import scala.collection.immutable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.annotation.InternalApi
object FishingOutcomes { object FishingOutcomes {
/** /**
* Consume this message and continue with the next * Consume this message and continue with the next
@ -47,13 +48,13 @@ object TestProbe {
def create[M](system: ActorSystem[_]): TestProbe[M] = def create[M](system: ActorSystem[_]): TestProbe[M] =
create(name = "testProbe", system) create(name = "testProbe", system)
def create[M](clazz: Class[M], system: ActorSystem[_]): TestProbe[M] = def create[M](@unused clazz: Class[M], system: ActorSystem[_]): TestProbe[M] =
create(system) create(system)
def create[M](name: String, system: ActorSystem[_]): TestProbe[M] = def create[M](name: String, system: ActorSystem[_]): TestProbe[M] =
new TestProbeImpl[M](name, system) new TestProbeImpl[M](name, system)
def create[M](name: String, clazz: Class[M], system: ActorSystem[_]): TestProbe[M] = def create[M](name: String, @unused clazz: Class[M], system: ActorSystem[_]): TestProbe[M] =
new TestProbeImpl[M](name, system) new TestProbeImpl[M](name, system)
} }

View file

@ -19,7 +19,7 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
"be able to become in its constructor" in { "be able to become in its constructor" in {
val a = system.actorOf(Props(new Becomer { val a = system.actorOf(Props(new Becomer {
context.become { case always sender() ! always } context.become { case always sender() ! always }
def receive = { case always sender() ! "FAILURE" } def receive = { case _ sender() ! "FAILURE" }
})) }))
a ! "pigdog" a ! "pigdog"
expectMsg("pigdog") expectMsg("pigdog")
@ -28,7 +28,7 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
"be able to become multiple times in its constructor" in { "be able to become multiple times in its constructor" in {
val a = system.actorOf(Props(new Becomer { val a = system.actorOf(Props(new Becomer {
for (i 1 to 4) context.become({ case always sender() ! i + ":" + always }) for (i 1 to 4) context.become({ case always sender() ! i + ":" + always })
def receive = { case always sender() ! "FAILURE" } def receive = { case _ sender() ! "FAILURE" }
})) }))
a ! "pigdog" a ! "pigdog"
expectMsg("4:pigdog") expectMsg("4:pigdog")
@ -48,7 +48,7 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
"be able to become, with stacking, multiple times in its constructor" in { "be able to become, with stacking, multiple times in its constructor" in {
val a = system.actorOf(Props(new Becomer { val a = system.actorOf(Props(new Becomer {
for (i 1 to 4) context.become({ case always sender() ! i + ":" + always; context.unbecome() }, false) for (i 1 to 4) context.become({ case always sender() ! i + ":" + always; context.unbecome() }, false)
def receive = { case always sender() ! "FAILURE" } def receive = { case _ sender() ! "FAILURE" }
})) }))
a ! "pigdog" a ! "pigdog"
a ! "pigdog" a ! "pigdog"

View file

@ -12,8 +12,8 @@ object ReflectSpec {
final class A final class A
final class B final class B
class One(a: A) class One(@unused a: A)
class Two(a: A, b: B) class Two(@unused a: A, @unused b: B)
class MultipleOne(a: A, b: B) { class MultipleOne(a: A, b: B) {
def this(a: A) { this(a, null) } def this(a: A) { this(a, null) }

View file

@ -99,7 +99,7 @@ abstract class Props private[akka] () extends Product with Serializable {
@tailrec def select(d: Props, acc: List[Props]): List[Props] = @tailrec def select(d: Props, acc: List[Props]): List[Props] =
d match { d match {
case EmptyProps acc.reverse case EmptyProps acc.reverse
case t: T select(d.next, (d withNext EmptyProps) :: acc) case _: T select(d.next, (d withNext EmptyProps) :: acc)
case _ select(d.next, acc) case _ select(d.next, acc)
} }
select(this, Nil) select(this, Nil)
@ -114,7 +114,7 @@ abstract class Props private[akka] () extends Product with Serializable {
@tailrec def select(d: Props, acc: List[Props]): List[Props] = @tailrec def select(d: Props, acc: List[Props]): List[Props] =
d match { d match {
case EmptyProps acc case EmptyProps acc
case t: T select(d.next, acc) case _: T select(d.next, acc)
case _ select(d.next, d :: acc) case _ select(d.next, d :: acc)
} }
@tailrec def link(l: List[Props], acc: Props): Props = @tailrec def link(l: List[Props], acc: Props): Props =

View file

@ -12,7 +12,7 @@ import akka.actor.typed.BehaviorInterceptor.{ PreStartTarget, ReceiveTarget, Sig
import akka.actor.typed.SupervisorStrategy._ import akka.actor.typed.SupervisorStrategy._
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.util.OptionVal import akka.util.{ OptionVal, unused }
import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.reflect.ClassTag import scala.reflect.ClassTag
@ -30,7 +30,7 @@ import scala.util.control.NonFatal
case r: Restart case r: Restart
Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior) Behaviors.intercept[T, T](new RestartSupervisor(initialBehavior, r))(initialBehavior)
case r: Stop case r: Stop
Behaviors.intercept[T, T](new StopSupervisor(initialBehavior, r))(initialBehavior) Behaviors.intercept[T, T](new StopSupervisor(r))(initialBehavior)
case r: Backoff case r: Backoff
Behaviors.intercept[T, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior) Behaviors.intercept[T, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior)
} }
@ -86,7 +86,7 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
} catch handleReceiveException(ctx, target) } catch handleReceiveException(ctx, target)
} }
protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { protected def handleException(@unused ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr) case NonFatal(t: Thr)
Behavior.failed(t) Behavior.failed(t)
} }
@ -100,7 +100,7 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
handleException(ctx) handleException(ctx)
} }
private class StopSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { private class StopSupervisor[T, Thr <: Throwable: ClassTag](strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) {
override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = { override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
case NonFatal(t: Thr) case NonFatal(t: Thr)
log(ctx, t) log(ctx, t)
@ -137,13 +137,13 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
throw t throw t
} else { } else {
log(ctx, t) log(ctx, t)
restart(ctx, t) restart()
aroundStart(ctx, target) aroundStart(ctx, target)
} }
} }
} }
private def restart(ctx: ActorContext[_], t: Throwable) = { private def restart() = {
val timeLeft = deadlineHasTimeLeft val timeLeft = deadlineHasTimeLeft
val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange)
restarts = if (timeLeft) restarts + 1 else 1 restarts = if (timeLeft) restarts + 1 else 1
@ -161,7 +161,7 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart") case NonFatal(ex) ctx.asScala.log.error(ex, "failure during PreRestart")
} }
log(ctx, t) log(ctx, t)
restart(ctx, t) restart()
Behavior.validateAsInitial(Behavior.start(initial, ctx)) Behavior.validateAsInitial(Behavior.start(initial, ctx))
} }
} }

View file

@ -135,9 +135,9 @@ import akka.util.OptionVal
} }
override def unhandled(msg: Any): Unit = msg match { override def unhandled(msg: Any): Unit = msg match {
case t @ Terminated(ref) throw DeathPactException(ref) case Terminated(ref) throw DeathPactException(ref)
case msg: Signal // that's ok case _: Signal // that's ok
case other super.unhandled(other) case other super.unhandled(other)
} }
override val supervisorStrategy = untyped.OneForOneStrategy(loggingEnabled = false) { override val supervisorStrategy = untyped.OneForOneStrategy(loggingEnabled = false) {

View file

@ -60,8 +60,8 @@ private[akka] object ActorRefAdapter {
sysmsg.Watch( sysmsg.Watch(
toUntyped(watchee), toUntyped(watchee),
toUntyped(watcher))) toUntyped(watcher)))
case internal.Unwatch(watchee, watcher) untyped.sendSystemMessage(sysmsg.Unwatch(toUntyped(watchee), toUntyped(watcher))) case internal.Unwatch(watchee, watcher) untyped.sendSystemMessage(sysmsg.Unwatch(toUntyped(watchee), toUntyped(watcher)))
case internal.DeathWatchNotification(ref, cause) untyped.sendSystemMessage(sysmsg.DeathWatchNotification(toUntyped(ref), true, false)) case internal.DeathWatchNotification(ref, _) untyped.sendSystemMessage(sysmsg.DeathWatchNotification(toUntyped(ref), true, false))
case internal.NoMessage // just to suppress the warning case internal.NoMessage // just to suppress the warning
} }
} }

View file

@ -13,6 +13,7 @@ import akka.actor.typed.scaladsl
import akka.annotation.ApiMayChange import akka.annotation.ApiMayChange
import akka.japi.function.{ Function2 JapiFunction2 } import akka.japi.function.{ Function2 JapiFunction2 }
import akka.japi.pf.PFBuilder import akka.japi.pf.PFBuilder
import akka.util.unused
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.reflect.ClassTag import scala.reflect.ClassTag
@ -156,7 +157,7 @@ object Behaviors {
* @param type the supertype of all messages accepted by this behavior * @param type the supertype of all messages accepted by this behavior
* @return the behavior builder * @return the behavior builder
*/ */
def receive[T](`type`: Class[T]): BehaviorBuilder[T] = BehaviorBuilder.create[T] def receive[T](@unused `type`: Class[T]): BehaviorBuilder[T] = BehaviorBuilder.create[T]
/** /**
* Construct an actor behavior that can react to lifecycle signals only. * Construct an actor behavior that can react to lifecycle signals only.

View file

@ -6,14 +6,14 @@ package akka.actor.typed.scaladsl
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import scala.concurrent.Future
import akka.actor.{ Address, RootActorPath, Scheduler } import akka.actor.{ Address, RootActorPath, Scheduler }
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.internal.{ adapter adapt } import akka.actor.typed.internal.{ adapter adapt }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.pattern.PromiseActorRef import akka.pattern.PromiseActorRef
import akka.util.Timeout import akka.util.{ Timeout, unused }
import scala.concurrent.Future
import akka.actor.typed.RecipientRef import akka.actor.typed.RecipientRef
import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.InternalRecipientRef
@ -54,7 +54,7 @@ object AskPattern {
* val f: Future[Reply] = target ? replyTo => (Request("hello", replyTo)) * val f: Future[Reply] = target ? replyTo => (Request("hello", replyTo))
* }}} * }}}
*/ */
def ?[U](replyTo: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { def ?[U](replyTo: ActorRef[U] T)(implicit timeout: Timeout, @unused scheduler: Scheduler): Future[U] = {
// We do not currently use the implicit scheduler, but want to require it // We do not currently use the implicit scheduler, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219 // because it might be needed when we move to a 'native' typed runtime, see #24219
ref match { ref match {

View file

@ -97,7 +97,7 @@ trait Inbox { this: ActorDSL.type ⇒
case g: Get case g: Get
if (messages.isEmpty) enqueueQuery(g) if (messages.isEmpty) enqueueQuery(g)
else sender() ! messages.dequeue() else sender() ! messages.dequeue()
case s @ Select(_, predicate, _) case s: Select
if (messages.isEmpty) enqueueQuery(s) if (messages.isEmpty) enqueueQuery(s)
else { else {
currentSelect = s currentSelect = s

View file

@ -220,7 +220,7 @@ private[remote] class ReliableDeliverySupervisor(
settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery) settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e @ (_: AssociationProblem) Escalate case _: AssociationProblem Escalate
case NonFatal(e) case NonFatal(e)
val causedBy = if (e.getCause == null) "" else s"Caused by: [${e.getCause.getMessage}]" val causedBy = if (e.getCause == null) "" else s"Caused by: [${e.getCause.getMessage}]"
log.warning( log.warning(
@ -359,10 +359,10 @@ private[remote] class ReliableDeliverySupervisor(
// Resending will be triggered by the incoming GotUid message after the connection finished // Resending will be triggered by the incoming GotUid message after the connection finished
goToActive() goToActive()
} else goToIdle() } else goToIdle()
case AttemptSysMsgRedelivery // Ignore case AttemptSysMsgRedelivery // Ignore
case s @ Send(msg: SystemMessage, _, _, _) tryBuffer(s.copy(seqOpt = Some(nextSeq()))) case s @ Send(_: SystemMessage, _, _, _) tryBuffer(s.copy(seqOpt = Some(nextSeq())))
case s: Send context.system.deadLetters ! s case s: Send context.system.deadLetters ! s
case EndpointWriter.FlushAndStop context.stop(self) case EndpointWriter.FlushAndStop context.stop(self)
case EndpointWriter.StopReading(w, replyTo) case EndpointWriter.StopReading(w, replyTo)
replyTo ! EndpointWriter.StoppedReading(w) replyTo ! EndpointWriter.StoppedReading(w)
sender() ! EndpointWriter.StoppedReading(w) sender() ! EndpointWriter.StoppedReading(w)

View file

@ -10,6 +10,7 @@ import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope } import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder, OutboundEnvelope }
import akka.serialization._ import akka.serialization._
import akka.util.unused
import scala.util.control.NonFatal import scala.util.control.NonFatal
@ -83,8 +84,13 @@ private[akka] object MessageSerializer {
} finally Serialization.currentTransportInformation.value = oldInfo } finally Serialization.currentTransportInformation.value = oldInfo
} }
def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, def deserializeForArtery(
serializer: Int, classManifest: String, envelope: EnvelopeBuffer): AnyRef = { @unused system: ExtendedActorSystem,
@unused originUid: Long,
serialization: Serialization,
serializer: Int,
classManifest: String,
envelope: EnvelopeBuffer): AnyRef = {
serialization.deserializeByteBuffer(envelope.byteBuffer, serializer, classManifest) serialization.deserializeByteBuffer(envelope.byteBuffer, serializer, classManifest)
} }
} }

View file

@ -212,7 +212,7 @@ private[akka] class RemoteSystemDaemon(
private def doCreateActor(message: DaemonMsg, props: Props, deploy: Deploy, path: String, supervisor: ActorRef) = { private def doCreateActor(message: DaemonMsg, props: Props, deploy: Deploy, path: String, supervisor: ActorRef) = {
path match { path match {
case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" case ActorPathExtractor(_, elems) if elems.nonEmpty && elems.head == "remote"
// TODO RK currently the extracted address is just ignored, is that okay? // TODO RK currently the extracted address is just ignored, is that okay?
// TODO RK canonicalize path so as not to duplicate it always #1446 // TODO RK canonicalize path so as not to duplicate it always #1446
val subpath = elems.drop(1) val subpath = elems.drop(1)

View file

@ -8,10 +8,11 @@ import akka.AkkaException
import akka.Done import akka.Done
import akka.actor._ import akka.actor._
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.util.OptionVal import akka.util.{ OptionVal, unused }
/** /**
* RemoteTransportException represents a general failure within a RemoteTransport, * RemoteTransportException represents a general failure within a RemoteTransport,
@ -79,7 +80,7 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va
* @param cmd Command message to send to the transports. * @param cmd Command message to send to the transports.
* @return A Future that indicates when the message was successfully handled or dropped. * @return A Future that indicates when the message was successfully handled or dropped.
*/ */
def managementCommand(cmd: Any): Future[Boolean] = { Future.successful(false) } def managementCommand(@unused cmd: Any): Future[Boolean] = { Future.successful(false) }
/** /**
* A Logger that can be used to log issues that may occur * A Logger that can be used to log issues that may occur

View file

@ -106,7 +106,7 @@ private[remote] object Remoting {
private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
override def supervisorStrategy = OneForOneStrategy() { override def supervisorStrategy = OneForOneStrategy() {
case NonFatal(e) Restart case NonFatal(_) Restart
} }
def receive = { def receive = {
@ -327,7 +327,7 @@ private[remote] object EndpointManager {
def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = { def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = {
addressToWritable.get(remoteAddress) match { addressToWritable.get(remoteAddress) match {
case Some(Pass(ep, _)) addressToWritable += remoteAddress Pass(ep, Some(uid)) case Some(Pass(ep, _)) addressToWritable += remoteAddress Pass(ep, Some(uid))
case other case _
} }
} }
@ -482,7 +482,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
override val supervisorStrategy = { override val supervisorStrategy = {
def hopeless(e: HopelessAssociation): SupervisorStrategy.Directive = e match { def hopeless(e: HopelessAssociation): SupervisorStrategy.Directive = e match {
case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) case HopelessAssociation(_, remoteAddress, Some(uid), reason)
log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.", log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.",
remoteAddress, uid) remoteAddress, uid)
settings.QuarantineDuration match { settings.QuarantineDuration match {
@ -493,7 +493,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
Stop Stop
case HopelessAssociation(localAddress, remoteAddress, None, _) case HopelessAssociation(_, remoteAddress, None, _)
keepQuarantinedOr(remoteAddress) { keepQuarantinedOr(remoteAddress) {
log.warning( log.warning(
"Association to [{}] with unknown UID is irrecoverably failed. " + "Association to [{}] with unknown UID is irrecoverably failed. " +
@ -505,7 +505,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
OneForOneStrategy(loggingEnabled = false) { OneForOneStrategy(loggingEnabled = false) {
case e @ InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) case InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo)
keepQuarantinedOr(remoteAddress) { keepQuarantinedOr(remoteAddress) {
val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]" val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]"
log.warning( log.warning(
@ -522,7 +522,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
Stop Stop
case ShutDownAssociation(localAddress, remoteAddress, _) case ShutDownAssociation(_, remoteAddress, _)
keepQuarantinedOr(remoteAddress) { keepQuarantinedOr(remoteAddress) {
log.debug( log.debug(
"Remote system with address [{}] has shut down. " + "Remote system with address [{}] has shut down. " +
@ -655,7 +655,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
} }
case s @ Send(message, senderOption, recipientRef, _) case s @ Send(_, _, recipientRef, _)
val recipientAddress = recipientRef.path.address val recipientAddress = recipientRef.path.address
def createAndRegisterWritingEndpoint(): ActorRef = { def createAndRegisterWritingEndpoint(): ActorRef = {
@ -677,7 +677,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Some(Gated(timeOfRelease)) case Some(Gated(timeOfRelease))
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s
else extendedSystem.deadLetters ! s else extendedSystem.deadLetters ! s
case Some(Quarantined(uid, _)) case Some(Quarantined(_, _))
// timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
// the Quarantined tombstone and we know what UID we don't want to accept, so use it. // the Quarantined tombstone and we know what UID we don't want to accept, so use it.
createAndRegisterWritingEndpoint() ! s createAndRegisterWritingEndpoint() ! s
@ -686,7 +686,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
case ia @ InboundAssociation(handle: AkkaProtocolHandle) case ia @ InboundAssociation(_: AkkaProtocolHandle)
handleInboundAssociation(ia, writerIsIdle = false) handleInboundAssociation(ia, writerIsIdle = false)
case EndpointWriter.StoppedReading(endpoint) case EndpointWriter.StoppedReading(endpoint)
acceptPendingReader(takingOverFrom = endpoint) acceptPendingReader(takingOverFrom = endpoint)
@ -776,10 +776,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
endpoints.markAsQuarantined(handle.remoteAddress, uid, Deadline.now + settings.QuarantineDuration) endpoints.markAsQuarantined(handle.remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
createAndRegisterEndpoint(handle) createAndRegisterEndpoint(handle)
} }
case state case _
createAndRegisterEndpoint(handle) createAndRegisterEndpoint(handle)
} }
} }
case _ // ignore
} }
private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = { private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = {

View file

@ -24,7 +24,6 @@ import scala.util.Success
import scala.util.Try import scala.util.Try
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
import akka.actor.Actor import akka.actor.Actor
@ -56,8 +55,7 @@ import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.util.OptionVal import akka.util.{ OptionVal, WildcardIndex, unused }
import akka.util.WildcardIndex
/** /**
* INTERNAL API * INTERNAL API
@ -243,8 +241,11 @@ private[remote] object FlushOnShutdown {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, private[remote] class FlushOnShutdown(
inboundContext: InboundContext, associations: Set[Association]) extends Actor { done: Promise[Done],
timeout: FiniteDuration,
@unused inboundContext: InboundContext,
associations: Set[Association]) extends Actor {
var remaining = Map.empty[UniqueAddress, Int] var remaining = Map.empty[UniqueAddress, Int]

View file

@ -172,13 +172,10 @@ private[remote] class Association(
@volatile private[this] var queuesVisibility = false @volatile private[this] var queuesVisibility = false
private def controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(ControlQueueIndex) private def controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(ControlQueueIndex)
private def largeQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(LargeQueueIndex)
@volatile private[this] var _outboundControlIngress: OptionVal[OutboundControlIngress] = OptionVal.None @volatile private[this] var _outboundControlIngress: OptionVal[OutboundControlIngress] = OptionVal.None
@volatile private[this] var materializing = new CountDownLatch(1) @volatile private[this] var materializing = new CountDownLatch(1)
@volatile private[this] var outboundCompressionAccess: Vector[OutboundCompressionAccess] = Vector.empty @volatile private[this] var outboundCompressionAccess: Vector[OutboundCompressionAccess] = Vector.empty
// in case there is a restart at the same time as a compression table update
private val changeCompressionTimeout = 5.seconds
// keyed by stream queue index // keyed by stream queue index
private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues]) private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues])
@ -335,7 +332,7 @@ private[remote] class Association(
if (log.isDebugEnabled) { if (log.isDebugEnabled) {
val reason = val reason =
if (removed) "removed unused quarantined association" if (removed) "removed unused quarantined association"
else s"overflow of send queue, size [$queueSize]" else s"overflow of send queue, size [$qSize]"
log.debug( log.debug(
"Dropping message [{}] from [{}] to [{}] due to {}", "Dropping message [{}] from [{}] to [{}] due to {}",
Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), reason) Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), reason)
@ -718,7 +715,7 @@ private[remote] class Association(
queues(queueIndex) = wrapper // use new underlying queue immediately for restarts queues(queueIndex) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array queuesVisibility = true // volatile write for visibility of the queues array
val (queueValue, testMgmt, changeCompression, completed) = val (queueValue, _, changeCompression, completed) =
Source.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters)) Source.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters))
.via(streamKillSwitch.flow) .via(streamKillSwitch.flow)
.viaMat(transport.outboundTestFlow(this))(Keep.both) .viaMat(transport.outboundTestFlow(this))(Keep.both)

View file

@ -6,6 +6,10 @@ package akka.remote.artery
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import akka.Done import akka.Done
import akka.actor.{ EmptyLocalActorRef, _ } import akka.actor.{ EmptyLocalActorRef, _ }
import akka.event.Logging import akka.event.Logging
@ -17,11 +21,7 @@ import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRe
import akka.serialization.{ Serialization, SerializationExtension, Serializers } import akka.serialization.{ Serialization, SerializationExtension, Serializers }
import akka.stream._ import akka.stream._
import akka.stream.stage._ import akka.stream.stage._
import akka.util.{ OptionVal, Unsafe } import akka.util.{ OptionVal, Unsafe, unused }
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import akka.remote.artery.OutboundHandshake.HandshakeReq import akka.remote.artery.OutboundHandshake.HandshakeReq
/** /**
@ -43,7 +43,7 @@ private[remote] class Encoder(
system: ExtendedActorSystem, system: ExtendedActorSystem,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
bufferPool: EnvelopeBufferPool, bufferPool: EnvelopeBufferPool,
streamId: Int, @unused streamId: Int,
debugLogSend: Boolean, debugLogSend: Boolean,
version: Byte) version: Byte)
extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.OutboundCompressionAccess] { extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.OutboundCompressionAccess] {
@ -59,7 +59,6 @@ private[remote] class Encoder(
private val headerBuilder = HeaderBuilder.out() private val headerBuilder = HeaderBuilder.out()
headerBuilder.setVersion(version) headerBuilder.setVersion(version)
headerBuilder.setUid(uniqueLocalAddress.uid) headerBuilder.setUid(uniqueLocalAddress.uid)
private val localAddress = uniqueLocalAddress.address
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None private var _serialization: OptionVal[Serialization] = OptionVal.None
@ -331,7 +330,6 @@ private[remote] class Decoder(
override val compressions = inboundCompressions override val compressions = inboundCompressions
private val localAddress = inboundContext.localAddress.address
private val headerBuilder = HeaderBuilder.in(compressions) private val headerBuilder = HeaderBuilder.in(compressions)
private val actorRefResolver: ActorRefResolveCacheWithAddress = private val actorRefResolver: ActorRefResolveCacheWithAddress =
new ActorRefResolveCacheWithAddress(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress) new ActorRefResolveCacheWithAddress(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
@ -575,9 +573,9 @@ private[remote] class Decoder(
* INTERNAL API * INTERNAL API
*/ */
private[remote] class Deserializer( private[remote] class Deserializer(
inboundContext: InboundContext, @unused inboundContext: InboundContext,
system: ExtendedActorSystem, system: ExtendedActorSystem,
bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
val in: Inlet[InboundEnvelope] = Inlet("Artery.Deserializer.in") val in: Inlet[InboundEnvelope] = Inlet("Artery.Deserializer.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Deserializer.out") val out: Outlet[InboundEnvelope] = Outlet("Artery.Deserializer.out")

View file

@ -4,20 +4,19 @@
package akka.remote.artery package akka.remote.artery
import akka.actor.ActorSystem
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Future
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.actor.ActorSystem
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.FlowShape import akka.stream.FlowShape
import akka.stream.Inlet import akka.stream.Inlet
import akka.stream.Outlet import akka.stream.Outlet
import akka.stream.stage._ import akka.stream.stage._
import akka.util.OptionVal import akka.util.{ OptionVal, unused }
import akka.Done import akka.Done
import scala.concurrent.Future
import akka.actor.Address import akka.actor.Address
/** /**
@ -50,7 +49,7 @@ private[remote] object OutboundHandshake {
* INTERNAL API * INTERNAL API
*/ */
private[remote] class OutboundHandshake( private[remote] class OutboundHandshake(
system: ActorSystem, @unused system: ActorSystem,
outboundContext: OutboundContext, outboundContext: OutboundContext,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
timeout: FiniteDuration, timeout: FiniteDuration,
@ -129,7 +128,7 @@ private[remote] class OutboundHandshake(
// when it receives the HandshakeRsp reply // when it receives the HandshakeRsp reply
implicit val ec = materializer.executionContext implicit val ec = materializer.executionContext
uniqueRemoteAddress.foreach { uniqueRemoteAddress.foreach {
getAsyncCallback[UniqueAddress] { a getAsyncCallback[UniqueAddress] { _
if (handshakeState != Completed) { if (handshakeState != Completed) {
handshakeCompleted() handshakeCompleted()
if (isAvailable(out)) if (isAvailable(out))

View file

@ -41,7 +41,6 @@ private[remote] class MessageDispatcher(
} }
val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
val originalReceiver = recipient.path
recipient match { recipient match {

View file

@ -4,12 +4,13 @@
package akka.remote.artery package akka.remote.artery
import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.event.{ Logging, LoggingAdapter }
import akka.util.OptionVal
import java.nio.ByteBuffer import java.nio.ByteBuffer
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.event.{ Logging, LoggingAdapter }
import akka.util.{ OptionVal, unused }
/** /**
* INTERNAL API * INTERNAL API
@ -287,7 +288,7 @@ private[remote] object RemoteInstruments {
def getKey(kl: Int): Byte = (kl >>> 26).toByte def getKey(kl: Int): Byte = (kl >>> 26).toByte
def getLength(kl: Int): Int = kl & lengthMask def getLength(kl: Int): Int = kl & lengthMask
def create(system: ExtendedActorSystem, log: LoggingAdapter): Vector[RemoteInstrument] = { def create(system: ExtendedActorSystem, @unused log: LoggingAdapter): Vector[RemoteInstrument] = {
val c = system.settings.config val c = system.settings.config
val path = "akka.remote.artery.advanced.instruments" val path = "akka.remote.artery.advanced.instruments"
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._

View file

@ -91,7 +91,6 @@ import akka.util.OptionVal
private var incarnation = outboundContext.associationState.incarnation private var incarnation = outboundContext.associationState.incarnation
private val unacknowledged = new ArrayDeque[OutboundEnvelope] private val unacknowledged = new ArrayDeque[OutboundEnvelope]
private var resending = new ArrayDeque[OutboundEnvelope] private var resending = new ArrayDeque[OutboundEnvelope]
private var resendingFromSeqNo = -1L
private var stopping = false private var stopping = false
private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos
@ -288,7 +287,6 @@ import akka.util.OptionVal
incarnation = outboundContext.associationState.incarnation incarnation = outboundContext.associationState.incarnation
unacknowledged.clear() unacknowledged.clear()
resending.clear() resending.clear()
resendingFromSeqNo = -1L
cancelTimer(resendInterval) cancelTimer(resendInterval)
} }

View file

@ -34,7 +34,7 @@ private[remote] object AeronSource {
() ()
{ {
handler.reset handler.reset
val fragmentsRead = sub.poll(handler.fragmentsHandler, 1) sub.poll(handler.fragmentsHandler, 1)
val msg = handler.messageReceived val msg = handler.messageReceived
handler.reset() // for GC handler.reset() // for GC
if (msg ne null) { if (msg ne null) {

View file

@ -14,7 +14,7 @@ import akka.actor.Address
import akka.event.Logging import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.remote.artery._ import akka.remote.artery._
import akka.util.OptionVal import akka.util.{ OptionVal, unused }
import org.agrona.collections.Long2ObjectHashMap import org.agrona.collections.Long2ObjectHashMap
/** /**
@ -376,7 +376,7 @@ private[remote] abstract class InboundCompression[T >: Null](
* Add `n` occurrence for the given key and call `heavyHittedDetected` if element has become a heavy hitter. * Add `n` occurrence for the given key and call `heavyHittedDetected` if element has become a heavy hitter.
* Empty keys are omitted. * Empty keys are omitted.
*/ */
def increment(remoteAddress: Address, value: T, n: Long): Unit = { def increment(@unused remoteAddress: Address, value: T, n: Long): Unit = {
val count = cms.addObjectAndEstimateCount(value, n) val count = cms.addObjectAndEstimateCount(value, n)
addAndCheckIfheavyHitterDetected(value, count) addAndCheckIfheavyHitterDetected(value, count)
alive = true alive = true

View file

@ -169,7 +169,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
true true
} else { } else {
// The entry exists, let's update it. // The entry exists, let's update it.
updateExistingHeavyHitter(actualIdx, hashCode, item, count) updateExistingHeavyHitter(actualIdx, count)
// not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before)
false false
} }
@ -220,7 +220,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
* Replace existing heavy hitter give it a new `count` value. This will also restore the heap property, so this * Replace existing heavy hitter give it a new `count` value. This will also restore the heap property, so this
* might make a previously lowest hitter no longer be one. * might make a previously lowest hitter no longer be one.
*/ */
private def updateExistingHeavyHitter(foundHashIndex: Int, hashCode: HashCodeVal, item: T, count: Long): Unit = { private def updateExistingHeavyHitter(foundHashIndex: Int, count: Long): Unit = {
if (weights(foundHashIndex) > count) if (weights(foundHashIndex) > count)
throw new IllegalArgumentException(s"Weights can be only incremented or kept the same, not decremented. " + throw new IllegalArgumentException(s"Weights can be only incremented or kept the same, not decremented. " +
s"Previous weight was [${weights(foundHashIndex)}], attempted to modify it to [$count].") s"Previous weight was [${weights(foundHashIndex)}], attempted to modify it to [$count].")

View file

@ -41,10 +41,8 @@ import akka.stream.scaladsl.RestartFlow
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Tcp import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Tcp.IncomingConnection
import akka.stream.scaladsl.Tcp.ServerBinding import akka.stream.scaladsl.Tcp.ServerBinding
import akka.util.ByteString import akka.util.{ ByteString, OptionVal }
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -242,7 +240,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
// If something in the inboundConnectionFlow fails, e.g. framing, the connection will be teared down, // If something in the inboundConnectionFlow fails, e.g. framing, the connection will be teared down,
// but other parts of the inbound streams don't have to restarted. // but other parts of the inbound streams don't have to restarted.
def inboundConnectionFlow(inboundConnection: IncomingConnection): Flow[ByteString, ByteString, NotUsed] = { def inboundConnectionFlow: Flow[ByteString, ByteString, NotUsed] = {
// must create new Flow for each connection because of the FlightRecorder that can't be shared // must create new Flow for each connection because of the FlightRecorder that can't be shared
val afr = createFlightRecorderEventSink() val afr = createFlightRecorderEventSink()
Flow[ByteString] Flow[ByteString]
@ -279,7 +277,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
afr.loFreq( afr.loFreq(
TcpInbound_Connected, TcpInbound_Connected,
s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}") s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}")
connection.handleWith(inboundConnectionFlow(connection)) connection.handleWith(inboundConnectionFlow)
}) })
.run() .run()
.recoverWith { .recoverWith {
@ -329,8 +327,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
.toMat(inboundControlSink)({ case (a, (c, d)) (a, c, d) }) .toMat(inboundControlSink)({ case (a, (c, d)) (a, c, d) })
.run()(controlMaterializer) .run()(controlMaterializer)
attachControlMessageObserver(ctrl) attachControlMessageObserver(ctrl)
implicit val ec: ExecutionContext = materializer.executionContext updateStreamMatValues(completed)
updateStreamMatValues(ControlStreamId, completed)
(hub, completed) (hub, completed)
} }
@ -380,7 +377,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
setInboundCompressionAccess(inboundCompressionAccess) setInboundCompressionAccess(inboundCompressionAccess)
updateStreamMatValues(OrdinaryStreamId, completed) updateStreamMatValues(completed)
(inboundHub, completed) (inboundHub, completed)
} }
@ -395,12 +392,12 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
.toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both) .toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both)
.run()(materializer) .run()(materializer)
updateStreamMatValues(LargeStreamId, completed) updateStreamMatValues(completed)
(hub, completed) (hub, completed)
} }
private def updateStreamMatValues(streamId: Int, completed: Future[Done]): Unit = { private def updateStreamMatValues(completed: Future[Done]): Unit = {
implicit val ec: ExecutionContext = materializer.executionContext implicit val ec: ExecutionContext = materializer.executionContext
updateStreamMatValues(ControlStreamId, InboundStreamMatValues[NotUsed]( updateStreamMatValues(ControlStreamId, InboundStreamMatValues[NotUsed](
NotUsed, NotUsed,

View file

@ -24,8 +24,6 @@ import akka.event.LogMarker
import akka.event.Logging import akka.event.Logging
import akka.event.MarkerLoggingAdapter import akka.event.MarkerLoggingAdapter
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.stream.IgnoreComplete
import akka.stream.TLSClosing
import akka.stream.TLSRole import akka.stream.TLSRole
import com.typesafe.config.Config import com.typesafe.config.Config
import javax.net.ssl.KeyManager import javax.net.ssl.KeyManager
@ -158,8 +156,7 @@ class SslTransportException(message: String, cause: Throwable) extends RuntimeEx
sslContext: SSLContext, sslContext: SSLContext,
role: TLSRole, role: TLSRole,
hostname: String, hostname: String,
port: Int, port: Int): SSLEngine = {
closing: TLSClosing = IgnoreComplete): SSLEngine = {
val engine = sslContext.createSSLEngine(hostname, port) val engine = sslContext.createSSLEngine(hostname, port)

View file

@ -69,9 +69,9 @@ final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) exten
override def resizer: Option[Resizer] = local.resizer override def resizer: Option[Resizer] = local.resizer
override def withFallback(other: RouterConfig): RouterConfig = other match { override def withFallback(other: RouterConfig): RouterConfig = other match {
case RemoteRouterConfig(local: RemoteRouterConfig, nodes) throw new IllegalStateException( case RemoteRouterConfig(_: RemoteRouterConfig, _) throw new IllegalStateException(
"RemoteRouterConfig is not allowed to wrap a RemoteRouterConfig") "RemoteRouterConfig is not allowed to wrap a RemoteRouterConfig")
case RemoteRouterConfig(local: Pool, nodes) case RemoteRouterConfig(local: Pool, _)
copy(local = this.local.withFallback(local).asInstanceOf[Pool]) copy(local = this.local.withFallback(local).asInstanceOf[Pool])
case _ copy(local = this.local.withFallback(other).asInstanceOf[Pool]) case _ copy(local = this.local.withFallback(other).asInstanceOf[Pool])
} }

View file

@ -13,7 +13,7 @@ import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider import akka.actor.ExtensionIdProvider
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
import akka.remote.artery.LruBoundedCache import akka.remote.artery.LruBoundedCache
import akka.util.Unsafe import akka.util.{ Unsafe, unused }
/** /**
* INTERNAL API: Thread local cache per actor system * INTERNAL API: Thread local cache per actor system
@ -45,7 +45,7 @@ private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSys
override def initialValue: ActorRefResolveCache = new ActorRefResolveCache(provider) override def initialValue: ActorRefResolveCache = new ActorRefResolveCache(provider)
} }
def threadLocalCache(provider: RemoteActorRefProvider): ActorRefResolveCache = def threadLocalCache(@unused provider: RemoteActorRefProvider): ActorRefResolveCache =
current.get current.get
} }

View file

@ -6,11 +6,11 @@ package akka.remote.serialization
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest } import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest }
import akka.protobuf.ByteString import akka.protobuf.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.{ Deploy, ExtendedActorSystem, NoScopeGiven, Props, Scope } import akka.actor.{ Deploy, ExtendedActorSystem, NoScopeGiven, Props, Scope }
import akka.remote.DaemonMsgCreate import akka.remote.DaemonMsgCreate
import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData } import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData }
import akka.routing.{ NoRouter, RouterConfig } import akka.routing.{ NoRouter, RouterConfig }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.reflect.ClassTag import scala.reflect.ClassTag
import util.{ Failure, Success } import util.{ Failure, Success }
@ -177,8 +177,6 @@ private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSys
supervisor = deserializeActorRef(system, proto.getSupervisor)) supervisor = deserializeActorRef(system, proto.getSupervisor))
} }
private def oldSerialize(any: Any): ByteString = ByteString.copyFrom(serialization.serialize(any.asInstanceOf[AnyRef]).get)
private def serialize(any: Any): (Int, Boolean, String, Array[Byte]) = { private def serialize(any: Any): (Int, Boolean, String, Array[Byte]) = {
val m = any.asInstanceOf[AnyRef] val m = any.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(m) val serializer = serialization.findSerializerFor(m)

View file

@ -554,7 +554,7 @@ private[transport] class ProtocolStateActor(
} }
onTermination { onTermination {
case StopEvent(reason, _, OutboundUnassociated(remoteAddress, statusPromise, transport)) case StopEvent(reason, _, OutboundUnassociated(_, statusPromise, _))
statusPromise.tryFailure(reason match { statusPromise.tryFailure(reason match {
case FSM.Failure(info: DisassociateInfo) disassociateException(info) case FSM.Failure(info: DisassociateInfo) disassociateException(info)
case _ new AkkaProtocolException("Transport disassociated before handshake finished") case _ new AkkaProtocolException("Transport disassociated before handshake finished")
@ -611,7 +611,7 @@ private[transport] class ProtocolStateActor(
case FSM.Failure(ForbiddenUidReason) // no logging case FSM.Failure(ForbiddenUidReason) // no logging
case FSM.Failure(TimeoutReason(errorMessage)) case FSM.Failure(TimeoutReason(errorMessage))
log.info(errorMessage) log.info(errorMessage)
case other super.logTermination(reason) case _ super.logTermination(reason)
} }
private def listenForListenerRegistration(readHandlerPromise: Promise[HandleEventListener]): Unit = private def listenForListenerRegistration(readHandlerPromise: Promise[HandleEventListener]): Unit =

View file

@ -4,16 +4,17 @@
package akka.remote.transport package akka.remote.transport
import TestTransport._ import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap }
import akka.actor._ import akka.actor._
import akka.remote.transport.AssociationHandle._ import akka.remote.transport.AssociationHandle._
import akka.remote.transport.Transport._ import akka.remote.transport.Transport._
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.config.Config import com.typesafe.config.Config
import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap } import TestTransport._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
/** /**
@ -38,8 +39,6 @@ class TestTransport(
conf.getString("scheme-identifier")) conf.getString("scheme-identifier"))
} }
import akka.remote.transport.TestTransport._
override def isResponsibleFor(address: Address): Boolean = true override def isResponsibleFor(address: Address): Boolean = true
private val associationListenerPromise = Promise[AssociationEventListener]() private val associationListenerPromise = Promise[AssociationEventListener]()
@ -218,7 +217,7 @@ object TestTransport {
* The constant the future will be completed with. * The constant the future will be completed with.
*/ */
def pushConstant(c: B): Unit = push { def pushConstant(c: B): Unit = push {
(x) Future.successful(c) _ Future.successful(c)
} }
/** /**
@ -228,7 +227,7 @@ object TestTransport {
* The throwable the failed future will contain. * The throwable the failed future will contain.
*/ */
def pushError(e: Throwable): Unit = push { def pushError(e: Throwable): Unit = push {
(x) Future.failed(e) _ Future.failed(e)
} }
/** /**
@ -243,7 +242,7 @@ object TestTransport {
val originalBehavior = currentBehavior val originalBehavior = currentBehavior
push( push(
(params: A) for (delayed controlPromise.future; original originalBehavior(params)) yield original) (params: A) controlPromise.future.flatMap(_ originalBehavior(params)))
controlPromise controlPromise
} }

View file

@ -496,7 +496,7 @@ private[transport] class ThrottledAssociation(
inboundThrottleMode = mode inboundThrottleMode = mode
sender() ! SetThrottleAck sender() ! SetThrottleAck
stay() stay()
case Event(Disassociated(info), _) case Event(Disassociated(_), _)
stop() // not notifying the upstream handler is intentional: we are relying on heartbeating stop() // not notifying the upstream handler is intentional: we are relying on heartbeating
case Event(FailWith(reason), _) case Event(FailWith(reason), _)
if (upstreamListener ne null) upstreamListener notify Disassociated(reason) if (upstreamListener ne null) upstreamListener notify Disassociated(reason)
@ -513,7 +513,7 @@ private[transport] class ThrottledAssociation(
} catch { } catch {
// This layer should not care about malformed packets. Also, this also useful for testing, because // This layer should not care about malformed packets. Also, this also useful for testing, because
// arbitrary payload could be passed in // arbitrary payload could be passed in
case NonFatal(e) None case NonFatal(_) None
} }
} }

View file

@ -5,12 +5,12 @@
package akka.remote.transport package akka.remote.transport
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.util.control.NoStackTrace
import akka.actor.{ ActorRef, Address, NoSerializationVerificationNeeded } import akka.actor.{ ActorRef, Address, NoSerializationVerificationNeeded }
import akka.util.ByteString import akka.util.{ ByteString, unused }
import akka.remote.transport.AssociationHandle.HandleEventListener import akka.remote.transport.AssociationHandle.HandleEventListener
import akka.AkkaException import akka.AkkaException
import scala.util.control.NoStackTrace
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
@ -143,7 +143,7 @@ trait Transport {
* @param cmd Command message to the transport * @param cmd Command message to the transport
* @return Future that succeeds when the command was handled or dropped * @return Future that succeeds when the command was handled or dropped
*/ */
def managementCommand(cmd: Any): Future[Boolean] = { Future.successful(false) } def managementCommand(@unused cmd: Any): Future[Boolean] = { Future.successful(false) }
} }

View file

@ -6,7 +6,10 @@ package akka.remote.transport.netty
import akka.AkkaException import akka.AkkaException
import java.nio.channels.ClosedChannelException import java.nio.channels.ClosedChannelException
import akka.util.unused
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
@ -14,17 +17,17 @@ import scala.util.control.NonFatal
*/ */
private[netty] trait NettyHelpers { private[netty] trait NettyHelpers {
protected def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = () protected def onConnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = () protected def onDisconnect(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = () protected def onOpen(@unused ctx: ChannelHandlerContext, @unused e: ChannelStateEvent): Unit = ()
protected def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = () protected def onMessage(@unused ctx: ChannelHandlerContext, @unused e: MessageEvent): Unit = ()
protected def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = () protected def onException(@unused ctx: ChannelHandlerContext, @unused e: ExceptionEvent): Unit = ()
final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = { final protected def transformException(@unused ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = {
val cause = if (ev.getCause ne null) ev.getCause else new AkkaException("Unknown cause") val cause = if (ev.getCause ne null) ev.getCause else new AkkaException("Unknown cause")
cause match { cause match {
case _: ClosedChannelException // Ignore case _: ClosedChannelException // Ignore

View file

@ -505,7 +505,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
} catch { } catch {
case NonFatal(e) { case NonFatal(e) {
log.error("failed to bind to {}, shutting down Netty transport", address) log.error("failed to bind to {}, shutting down Netty transport", address)
try { shutdown() } catch { case NonFatal(e) } // ignore possible exception during shutdown try { shutdown() } catch { case NonFatal(_) } // ignore possible exception during shutdown
throw e throw e
} }
} }

View file

@ -20,8 +20,6 @@ import akka.event.Logging
import akka.event.MarkerLoggingAdapter import akka.event.MarkerLoggingAdapter
import akka.remote.RemoteTransportException import akka.remote.RemoteTransportException
import akka.remote.artery.tcp.SecureRandomFactory import akka.remote.artery.tcp.SecureRandomFactory
import akka.stream.IgnoreComplete
import akka.stream.TLSClosing
import akka.stream.TLSRole import akka.stream.TLSRole
import javax.net.ssl.KeyManager import javax.net.ssl.KeyManager
import javax.net.ssl.KeyManagerFactory import javax.net.ssl.KeyManagerFactory
@ -108,10 +106,7 @@ import javax.net.ssl.TrustManagerFactory
createSSLEngine(sslContext, role) createSSLEngine(sslContext, role)
} }
private def createSSLEngine( private def createSSLEngine(sslContext: SSLContext, role: TLSRole): SSLEngine = {
sslContext: SSLContext,
role: TLSRole,
closing: TLSClosing = IgnoreComplete): SSLEngine = {
val engine = sslContext.createSSLEngine() val engine = sslContext.createSSLEngine()

View file

@ -85,7 +85,7 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout {
Await.result(behavior(()), timeout.duration) should ===(3) Await.result(behavior(()), timeout.duration) should ===(3)
} }
"enable delayed completition" in { "enable delayed completion" in {
val behavior = defaultBehavior val behavior = defaultBehavior
val controlPromise = behavior.pushDelayed val controlPromise = behavior.pushDelayed
val f = behavior(()) val f = behavior(())
@ -96,7 +96,7 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout {
awaitCond(f.isCompleted) awaitCond(f.isCompleted)
} }
"log calls and parametrers" in { "log calls and parameters" in {
val logPromise = Promise[Int]() val logPromise = Promise[Int]()
val behavior = new SwitchableLoggedBehavior[Int, Int]((i) Future.successful(3), (i) logPromise.success(i)) val behavior = new SwitchableLoggedBehavior[Int, Int]((i) Future.successful(3), (i) logPromise.success(i))

View file

@ -8,8 +8,6 @@ import java.util.concurrent.ThreadFactory
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import com.typesafe.config.Config
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
@ -19,6 +17,8 @@ import scala.util.Try
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.actor.Scheduler import akka.actor.Scheduler
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.util.unused
import com.typesafe.config.Config
/** /**
* For testing: scheduler that does not look at the clock, but must be * For testing: scheduler that does not look at the clock, but must be
@ -30,7 +30,7 @@ import akka.event.LoggingAdapter
* easier, but these tests might fail to catch race conditions that only * easier, but these tests might fail to catch race conditions that only
* happen when tasks are scheduled in parallel in 'real time'. * happen when tasks are scheduled in parallel in 'real time'.
*/ */
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler { class ExplicitlyTriggeredScheduler(@unused config: Config, log: LoggingAdapter, @unused tf: ThreadFactory) extends Scheduler {
private case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable) private case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable)