diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 1bb919e01a..b11a12e729 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -12,7 +12,6 @@ import akka.actor._ import akka.testkit.{ EventFilter, AkkaSpec } import scala.concurrent.{ Future, Await, ExecutionContext } import scala.concurrent.duration._ -import akka.dispatch.{ UnboundedMailbox, BoundedMailbox, SingleConsumerOnlyUnboundedMailbox } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 23c0e244ff..de119e4a84 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -1,5 +1,6 @@ package akka.performance.microbench +import language.postfixOps import akka.performance.workbench.PerformanceSpec import akka.actor._ import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 66ba4083f4..434719c35b 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -3,6 +3,7 @@ */ package akka.routing +import language.postfixOps import scala.concurrent.Await import scala.concurrent.duration._ import scala.collection.immutable diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java index 46c0d08e1f..3abfc65d56 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java @@ -5,14 +5,17 @@ package akka.dispatch; import akka.util.Unsafe; + import java.util.concurrent.atomic.AtomicReference; /** * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue: * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue */ +@SuppressWarnings("serial") public abstract class AbstractNodeQueue extends AtomicReference> { // Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically + @SuppressWarnings("unused") private volatile Node _tailDoNotCallMeDirectly; protected AbstractNodeQueue() { @@ -63,7 +66,6 @@ public abstract class AbstractNodeQueue extends AtomicReference next = peekNode(); if (next == null) return null; @@ -106,6 +108,7 @@ public abstract class AbstractNodeQueue extends AtomicReference { public T value; + @SuppressWarnings("unused") private volatile Node _nextDoNotCallMeDirectly; public Node() { diff --git a/akka-actor/src/main/java/akka/japi/pf/FSMStateFunctionBuilder.java b/akka-actor/src/main/java/akka/japi/pf/FSMStateFunctionBuilder.java index 0a571abe82..d6e900cb37 100644 --- a/akka-actor/src/main/java/akka/japi/pf/FSMStateFunctionBuilder.java +++ b/akka-actor/src/main/java/akka/japi/pf/FSMStateFunctionBuilder.java @@ -6,6 +6,7 @@ package akka.japi.pf; import akka.actor.FSM; import scala.PartialFunction; + import java.util.List; /** @@ -14,6 +15,7 @@ import java.util.List; * @param the state type * @param the data type */ +@SuppressWarnings("rawtypes") public class FSMStateFunctionBuilder { private PFBuilder, FSM.State> builder = diff --git a/akka-actor/src/main/scala/akka/Main.scala b/akka-actor/src/main/scala/akka/Main.scala index 57dedd6897..7c635edc54 100644 --- a/akka-actor/src/main/scala/akka/Main.scala +++ b/akka-actor/src/main/scala/akka/Main.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal object Main { /** - * @params args one argument: the class of the application supervisor actor + * @param args one argument: the class of the application supervisor actor */ def main(args: Array[String]): Unit = { if (args.length != 1) { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index cc51b239e7..f098741556 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -118,7 +118,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * Sends the specified message to the sender, i.e. fire-and-forget * semantics, including the sender reference if possible. * - * Pass [[ActorRef#noSender]] or `null` as sender if there is nobody to reply to + * Pass [[akka.actor.ActorRef$.noSender]] or `null` as sender if there is nobody to reply to */ final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender) @@ -476,7 +476,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, override val path: ActorPath, val eventStream: EventStream) extends MinimalActorRef { - @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated(): Boolean = true + @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated = true override def sendSystemMessage(message: SystemMessage): Unit = { if (Mailbox.debug) println(s"ELAR $path having enqueued $message") diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 33b9696375..d0dcb3f512 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -75,7 +75,7 @@ object ActorSystem { /** * Creates a new ActorSystem with the specified name, the specified Config, the specified ClassLoader, * and the specified ExecutionContext. The ExecutionContext will be used as the default executor inside this ActorSystem. - * If [[null]] is passed in for the Config, ClassLoader and/or ExecutionContext parameters, the respective default value + * If `null` is passed in for the Config, ClassLoader and/or ExecutionContext parameters, the respective default value * will be used. If no Config is given, the default reference config will be obtained from the ClassLoader. * If no ClassLoader is given, it obtains the current ClassLoader by first inspecting the current * threads' getContextClassLoader, then tries to walk the stack to find the callers class loader, then diff --git a/akka-actor/src/main/scala/akka/routing/Router.scala b/akka-actor/src/main/scala/akka/routing/Router.scala index 3c8cc3f2c5..411e0b269b 100644 --- a/akka-actor/src/main/scala/akka/routing/Router.scala +++ b/akka-actor/src/main/scala/akka/routing/Router.scala @@ -38,7 +38,7 @@ trait Routee { } /** - * [[Routee]] that sends the messages to an [[akka.actor.ActorRefRoutee]]. + * [[Routee]] that sends the messages to an [[akka.actor.ActorRef]]. */ case class ActorRefRoutee(ref: ActorRef) extends Routee { override def send(message: Any, sender: ActorRef): Unit = diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 3c04f87214..5ee47f7821 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -43,9 +43,12 @@ object Helpers { /** * Converts a "currentTimeMillis"-obtained timestamp accordingly: + * {{{ * "$hours%02d:$minutes%02d:$seconds%02d.$ms%03dUTC" + * }}} + * * @param timestamp a "currentTimeMillis"-obtained timestamp - * @return A String formatted like: $hours%02d:$minutes%02d:$seconds%02d.$ms%03dUTC + * @return the formatted timestamp */ def currentTimeMillisToUTCString(timestamp: Long): String = { val timeOfDay = timestamp % 86400000L diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala index 951bb1cdc0..f35f0a9ce6 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala @@ -265,7 +265,7 @@ class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extensio else { val numberOfContacts: Int = config.getInt("number-of-contacts") val responseTunnelReceiveTimeout = - Duration(config.getMilliseconds("response-tunnel-receive-timeout"), MILLISECONDS) + config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis val name = config.getString("name") // important to use val mediator here to activate it outside of ClusterReceptionist constructor val mediator = pubSubMediator diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index e8618be5dc..8091396b39 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -180,11 +180,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } val HasNecessaryClusterRole: Boolean = Role.forall(cluster.selfRoles.contains) val GuardianName: String = config.getString("guardian-name") - val RetryInterval: FiniteDuration = Duration(config.getMilliseconds("retry-interval"), MILLISECONDS) + val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis val BufferSize: Int = config.getInt("buffer-size") - val HandOffTimeout: FiniteDuration = Duration(config.getMilliseconds("handoff-timeout"), MILLISECONDS) - val RebalanceInterval: FiniteDuration = Duration(config.getMilliseconds("rebalance-interval"), MILLISECONDS) - val SnapshotInterval: FiniteDuration = Duration(config.getMilliseconds("snapshot-interval"), MILLISECONDS) + val HandOffTimeout: FiniteDuration = config.getDuration("handoff-timeout", MILLISECONDS).millis + val RebalanceInterval: FiniteDuration = config.getDuration("rebalance-interval", MILLISECONDS).millis + val SnapshotInterval: FiniteDuration = config.getDuration("snapshot-interval", MILLISECONDS).millis val LeastShardAllocationRebalanceThreshold: Int = config.getInt("least-shard-allocation-strategy.rebalance-threshold") val LeastShardAllocationMaxSimultaneousRebalance: Int = @@ -868,7 +868,7 @@ object ShardCoordinator { * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated - * @retur the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of * the references included in the `currentShardAllocations` parameter */ def allocateShard(requester: ActorRef, shardId: ShardId, @@ -910,7 +910,7 @@ object ShardCoordinator { * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated - * @retur the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of * the references included in the `currentShardAllocations` parameter */ def allocateShard(requester: ActorRef, shardId: String, diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index c69e362e82..f8b4366051 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -501,8 +501,8 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension case "broadcast" ⇒ BroadcastRoutingLogic() case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]") } - val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS) - val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS) + val gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis + val removedTimeToLive = config.getDuration("removed-time-to-live", MILLISECONDS).millis val maxDeltaElements = config.getInt("max-delta-elements") val name = config.getString("name") system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements), diff --git a/akka-persistence/src/main/scala/akka/persistence/View.scala b/akka-persistence/src/main/scala/akka/persistence/View.scala index 29b8f05469..8a4d484d27 100644 --- a/akka-persistence/src/main/scala/akka/persistence/View.scala +++ b/akka-persistence/src/main/scala/akka/persistence/View.scala @@ -52,7 +52,7 @@ case object Update { * view can also run on a different node, provided that a replicated journal is used. Implementation * classes reference a processor by implementing `processorId`. * - * Views can also store snapshots of internal state by calling [[saveSnapshot]]. The snapshots of a view + * Views can also store snapshots of internal state by calling [[#saveSnapshot]]. The snapshots of a view * are independent of those of the referenced processor. During recovery, a saved snapshot is offered * to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger * than the snapshot. Default is to offer the latest saved snapshot. @@ -63,9 +63,8 @@ case object Update { * `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional * view updates by sending the view [[Update]] requests. See also methods * - * - [[autoUpdate]] for turning automated updates on or off - * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle and - * - [[autoRecoveryReplayMax]] for limiting the number of replayed messages on initial view recovery + * - [[#autoUpdate]] for turning automated updates on or off + * - [[#autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle * * Views can also use channels to communicate with destinations in the same way as processors can do. */ @@ -174,7 +173,7 @@ trait View extends Actor with Recovery { viewSettings.autoUpdateReplayMax /** - * Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoRecoveryReplayMax` + * Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax` * messages (following that snapshot). */ override def preStart(): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index b447353aa1..b741e72b6f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -79,7 +79,7 @@ final class RemoteSettings(val config: Config) { } requiring (_ > 0, "system-message-buffer-size must be > 0") val QuarantineDuration: FiniteDuration = { - Duration(getMilliseconds("akka.remote.prune-quarantine-marker-after"), MILLISECONDS).requiring(_ > Duration.Zero, + config.getMillisDuration("akka.remote.prune-quarantine-marker-after").requiring(_ > Duration.Zero, "prune-quarantine-marker-after must be > 0 ms") } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 5be4c87a84..a7ebe97691 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -221,6 +221,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode // Not used anywhere only to keep compatibility with RemoteTransport interface + @deprecated("Use the LogRemoteLifecycleEvents setting instead.", "2.3") protected def logRemoteLifeCycleEvents: Boolean = LogRemoteLifecycleEvents } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 937274b7a1..20c3a13a8b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -141,9 +141,8 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS // Write once variable initialized when Listen is called. @volatile protected var manager: ActorRef = _ - // FIXME #3074 how to replace actorFor here? private def registerManager(): Future[ActorRef] = - (system.actorFor("/system/transports") ? RegisterTransportActor(managerProps, managerName)).mapTo[ActorRef] + (system.actorSelection("/system/transports") ? RegisterTransportActor(managerProps, managerName)).mapTo[ActorRef] override def interceptListen(listenAddress: Address, listenerPromise: Future[AssociationEventListener]): Future[AssociationEventListener] = { diff --git a/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala b/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala index f27eb6a94c..04309d9cce 100644 --- a/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/DaemonicSpec.scala @@ -36,6 +36,7 @@ class DaemonicSpec extends AkkaSpec { akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" akka.remote.netty.tcp.port = 0 + akka.log-dead-letters-during-shutdown = off """)) val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort)) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index 1ee8254960..5d00cb5092 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -37,6 +37,8 @@ akka { shutdown(other) } + override def expectedTestDuration: FiniteDuration = 90.seconds + "receive Terminated when watched node is unknown host" in { val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject" system.actorOf(Props(new Actor { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 86a59a693d..776d1ed4d4 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -74,6 +74,7 @@ object RemoteWatcherSpec { class RemoteWatcherSpec extends AkkaSpec( """akka { loglevel = INFO + log-dead-letters-during-shutdown = false actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty.tcp { hostname = localhost diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index f9766fb730..d97a2a52de 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -59,6 +59,8 @@ object ThrottlerTransportAdapterSpec { if (received >= MessageCount) controller ! (System.nanoTime() - startTime) } } + + case class Lost(msg: String) } class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout { @@ -96,12 +98,15 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende } "survive blackholing" taggedAs TimingTest in { - here ! "Blackhole 1" - expectMsg("Blackhole 1") + here ! Lost("Blackhole 1") + expectMsg(Lost("Blackhole 1")) + + muteDeadLetters(classOf[Lost])(system) + muteDeadLetters(classOf[Lost])(systemB) throttle(Direction.Both, Blackhole) should be(true) - here ! "Blackhole 2" + here ! Lost("Blackhole 2") expectNoMsg(1.seconds) disassociate() should be(true) expectNoMsg(1.seconds) @@ -110,20 +115,20 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende // after we remove the Blackhole we can't be certain of the state // of the connection, repeat until success - here ! "Blackhole 3" + here ! Lost("Blackhole 3") awaitCond({ - if (receiveOne(Duration.Zero) == "Blackhole 3") + if (receiveOne(Duration.Zero) == Lost("Blackhole 3")) true else { - here ! "Blackhole 3" + here ! Lost("Blackhole 3") false } }, 15.seconds) here ! "Cleanup" fishForMessage(5.seconds) { - case "Cleanup" ⇒ true - case "Blackhole 3" ⇒ false + case "Cleanup" ⇒ true + case Lost("Blackhole 3") ⇒ false } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 2bda99c5e9..cfaff0e7dc 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -66,24 +66,24 @@ class TestFSMRef[S, D, T <: Actor]( } /** - * Proxy for [[FSM#setTimer]]. + * Proxy for [[akka.actor.FSM#setTimer]]. */ def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean = false) { fsm.setTimer(name, msg, timeout, repeat) } /** - * Proxy for [[FSM#cancelTimer]]. + * Proxy for [[akka.actor.FSM#cancelTimer]]. */ def cancelTimer(name: String) { fsm.cancelTimer(name) } /** - * Proxy for [[FSM#isStateTimerActive]]. + * Proxy for [[akka.actor.FSM#isStateTimerActive]]. */ def isTimerActive(name: String) = fsm.isTimerActive(name) /** - * Proxy for [[FSM#isStateTimerActive]]. + * Proxy for [[akka.actor.FSM#isStateTimerActive]]. */ def isStateTimerActive = fsm.isStateTimerActive }