=* #2966 Reduce warnings from compile, test, scaladoc
This commit is contained in:
parent
787c7ca439
commit
615c6b572c
22 changed files with 54 additions and 37 deletions
|
|
@ -12,7 +12,6 @@ import akka.actor._
|
||||||
import akka.testkit.{ EventFilter, AkkaSpec }
|
import akka.testkit.{ EventFilter, AkkaSpec }
|
||||||
import scala.concurrent.{ Future, Await, ExecutionContext }
|
import scala.concurrent.{ Future, Await, ExecutionContext }
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.dispatch.{ UnboundedMailbox, BoundedMailbox, SingleConsumerOnlyUnboundedMailbox }
|
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
|
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
package akka.performance.microbench
|
package akka.performance.microbench
|
||||||
|
|
||||||
|
import language.postfixOps
|
||||||
import akka.performance.workbench.PerformanceSpec
|
import akka.performance.workbench.PerformanceSpec
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
|
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
*/
|
*/
|
||||||
package akka.routing
|
package akka.routing
|
||||||
|
|
||||||
|
import language.postfixOps
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
|
|
||||||
|
|
@ -5,14 +5,17 @@
|
||||||
package akka.dispatch;
|
package akka.dispatch;
|
||||||
|
|
||||||
import akka.util.Unsafe;
|
import akka.util.Unsafe;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue:
|
* Lock-free MPSC linked queue implementation based on Dmitriy Vyukov's non-intrusive MPSC queue:
|
||||||
* http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
|
* http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
|
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
|
||||||
// Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
|
// Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
|
||||||
|
@SuppressWarnings("unused")
|
||||||
private volatile Node<T> _tailDoNotCallMeDirectly;
|
private volatile Node<T> _tailDoNotCallMeDirectly;
|
||||||
|
|
||||||
protected AbstractNodeQueue() {
|
protected AbstractNodeQueue() {
|
||||||
|
|
@ -63,7 +66,6 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
|
||||||
/*
|
/*
|
||||||
* !!! There is a copy of this code in pollNode() !!!
|
* !!! There is a copy of this code in pollNode() !!!
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public final T poll() {
|
public final T poll() {
|
||||||
final Node<T> next = peekNode();
|
final Node<T> next = peekNode();
|
||||||
if (next == null) return null;
|
if (next == null) return null;
|
||||||
|
|
@ -106,6 +108,7 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
|
||||||
|
|
||||||
public static class Node<T> {
|
public static class Node<T> {
|
||||||
public T value;
|
public T value;
|
||||||
|
@SuppressWarnings("unused")
|
||||||
private volatile Node<T> _nextDoNotCallMeDirectly;
|
private volatile Node<T> _nextDoNotCallMeDirectly;
|
||||||
|
|
||||||
public Node() {
|
public Node() {
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.japi.pf;
|
||||||
|
|
||||||
import akka.actor.FSM;
|
import akka.actor.FSM;
|
||||||
import scala.PartialFunction;
|
import scala.PartialFunction;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -14,6 +15,7 @@ import java.util.List;
|
||||||
* @param <S> the state type
|
* @param <S> the state type
|
||||||
* @param <D> the data type
|
* @param <D> the data type
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
public class FSMStateFunctionBuilder<S, D> {
|
public class FSMStateFunctionBuilder<S, D> {
|
||||||
|
|
||||||
private PFBuilder<FSM.Event<D>, FSM.State<S, D>> builder =
|
private PFBuilder<FSM.Event<D>, FSM.State<S, D>> builder =
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ import scala.util.control.NonFatal
|
||||||
object Main {
|
object Main {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @params args one argument: the class of the application supervisor actor
|
* @param args one argument: the class of the application supervisor actor
|
||||||
*/
|
*/
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
if (args.length != 1) {
|
if (args.length != 1) {
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
|
||||||
* Sends the specified message to the sender, i.e. fire-and-forget
|
* Sends the specified message to the sender, i.e. fire-and-forget
|
||||||
* semantics, including the sender reference if possible.
|
* semantics, including the sender reference if possible.
|
||||||
*
|
*
|
||||||
* Pass [[ActorRef#noSender]] or `null` as sender if there is nobody to reply to
|
* Pass [[akka.actor.ActorRef$.noSender]] or `null` as sender if there is nobody to reply to
|
||||||
*/
|
*/
|
||||||
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
|
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
|
||||||
|
|
||||||
|
|
@ -476,7 +476,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
|
||||||
override val path: ActorPath,
|
override val path: ActorPath,
|
||||||
val eventStream: EventStream) extends MinimalActorRef {
|
val eventStream: EventStream) extends MinimalActorRef {
|
||||||
|
|
||||||
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated(): Boolean = true
|
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated = true
|
||||||
|
|
||||||
override def sendSystemMessage(message: SystemMessage): Unit = {
|
override def sendSystemMessage(message: SystemMessage): Unit = {
|
||||||
if (Mailbox.debug) println(s"ELAR $path having enqueued $message")
|
if (Mailbox.debug) println(s"ELAR $path having enqueued $message")
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ object ActorSystem {
|
||||||
/**
|
/**
|
||||||
* Creates a new ActorSystem with the specified name, the specified Config, the specified ClassLoader,
|
* Creates a new ActorSystem with the specified name, the specified Config, the specified ClassLoader,
|
||||||
* and the specified ExecutionContext. The ExecutionContext will be used as the default executor inside this ActorSystem.
|
* and the specified ExecutionContext. The ExecutionContext will be used as the default executor inside this ActorSystem.
|
||||||
* If [[null]] is passed in for the Config, ClassLoader and/or ExecutionContext parameters, the respective default value
|
* If `null` is passed in for the Config, ClassLoader and/or ExecutionContext parameters, the respective default value
|
||||||
* will be used. If no Config is given, the default reference config will be obtained from the ClassLoader.
|
* will be used. If no Config is given, the default reference config will be obtained from the ClassLoader.
|
||||||
* If no ClassLoader is given, it obtains the current ClassLoader by first inspecting the current
|
* If no ClassLoader is given, it obtains the current ClassLoader by first inspecting the current
|
||||||
* threads' getContextClassLoader, then tries to walk the stack to find the callers class loader, then
|
* threads' getContextClassLoader, then tries to walk the stack to find the callers class loader, then
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ trait Routee {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* [[Routee]] that sends the messages to an [[akka.actor.ActorRefRoutee]].
|
* [[Routee]] that sends the messages to an [[akka.actor.ActorRef]].
|
||||||
*/
|
*/
|
||||||
case class ActorRefRoutee(ref: ActorRef) extends Routee {
|
case class ActorRefRoutee(ref: ActorRef) extends Routee {
|
||||||
override def send(message: Any, sender: ActorRef): Unit =
|
override def send(message: Any, sender: ActorRef): Unit =
|
||||||
|
|
|
||||||
|
|
@ -43,9 +43,12 @@ object Helpers {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts a "currentTimeMillis"-obtained timestamp accordingly:
|
* Converts a "currentTimeMillis"-obtained timestamp accordingly:
|
||||||
|
* {{{
|
||||||
* "$hours%02d:$minutes%02d:$seconds%02d.$ms%03dUTC"
|
* "$hours%02d:$minutes%02d:$seconds%02d.$ms%03dUTC"
|
||||||
|
* }}}
|
||||||
|
*
|
||||||
* @param timestamp a "currentTimeMillis"-obtained timestamp
|
* @param timestamp a "currentTimeMillis"-obtained timestamp
|
||||||
* @return A String formatted like: $hours%02d:$minutes%02d:$seconds%02d.$ms%03dUTC
|
* @return the formatted timestamp
|
||||||
*/
|
*/
|
||||||
def currentTimeMillisToUTCString(timestamp: Long): String = {
|
def currentTimeMillisToUTCString(timestamp: Long): String = {
|
||||||
val timeOfDay = timestamp % 86400000L
|
val timeOfDay = timestamp % 86400000L
|
||||||
|
|
|
||||||
|
|
@ -265,7 +265,7 @@ class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extensio
|
||||||
else {
|
else {
|
||||||
val numberOfContacts: Int = config.getInt("number-of-contacts")
|
val numberOfContacts: Int = config.getInt("number-of-contacts")
|
||||||
val responseTunnelReceiveTimeout =
|
val responseTunnelReceiveTimeout =
|
||||||
Duration(config.getMilliseconds("response-tunnel-receive-timeout"), MILLISECONDS)
|
config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis
|
||||||
val name = config.getString("name")
|
val name = config.getString("name")
|
||||||
// important to use val mediator here to activate it outside of ClusterReceptionist constructor
|
// important to use val mediator here to activate it outside of ClusterReceptionist constructor
|
||||||
val mediator = pubSubMediator
|
val mediator = pubSubMediator
|
||||||
|
|
|
||||||
|
|
@ -180,11 +180,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
||||||
}
|
}
|
||||||
val HasNecessaryClusterRole: Boolean = Role.forall(cluster.selfRoles.contains)
|
val HasNecessaryClusterRole: Boolean = Role.forall(cluster.selfRoles.contains)
|
||||||
val GuardianName: String = config.getString("guardian-name")
|
val GuardianName: String = config.getString("guardian-name")
|
||||||
val RetryInterval: FiniteDuration = Duration(config.getMilliseconds("retry-interval"), MILLISECONDS)
|
val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis
|
||||||
val BufferSize: Int = config.getInt("buffer-size")
|
val BufferSize: Int = config.getInt("buffer-size")
|
||||||
val HandOffTimeout: FiniteDuration = Duration(config.getMilliseconds("handoff-timeout"), MILLISECONDS)
|
val HandOffTimeout: FiniteDuration = config.getDuration("handoff-timeout", MILLISECONDS).millis
|
||||||
val RebalanceInterval: FiniteDuration = Duration(config.getMilliseconds("rebalance-interval"), MILLISECONDS)
|
val RebalanceInterval: FiniteDuration = config.getDuration("rebalance-interval", MILLISECONDS).millis
|
||||||
val SnapshotInterval: FiniteDuration = Duration(config.getMilliseconds("snapshot-interval"), MILLISECONDS)
|
val SnapshotInterval: FiniteDuration = config.getDuration("snapshot-interval", MILLISECONDS).millis
|
||||||
val LeastShardAllocationRebalanceThreshold: Int =
|
val LeastShardAllocationRebalanceThreshold: Int =
|
||||||
config.getInt("least-shard-allocation-strategy.rebalance-threshold")
|
config.getInt("least-shard-allocation-strategy.rebalance-threshold")
|
||||||
val LeastShardAllocationMaxSimultaneousRebalance: Int =
|
val LeastShardAllocationMaxSimultaneousRebalance: Int =
|
||||||
|
|
@ -868,7 +868,7 @@ object ShardCoordinator {
|
||||||
* @param shardId the id of the shard to allocate
|
* @param shardId the id of the shard to allocate
|
||||||
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
||||||
* in the order they were allocated
|
* in the order they were allocated
|
||||||
* @retur the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
||||||
* the references included in the `currentShardAllocations` parameter
|
* the references included in the `currentShardAllocations` parameter
|
||||||
*/
|
*/
|
||||||
def allocateShard(requester: ActorRef, shardId: ShardId,
|
def allocateShard(requester: ActorRef, shardId: ShardId,
|
||||||
|
|
@ -910,7 +910,7 @@ object ShardCoordinator {
|
||||||
* @param shardId the id of the shard to allocate
|
* @param shardId the id of the shard to allocate
|
||||||
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
|
||||||
* in the order they were allocated
|
* in the order they were allocated
|
||||||
* @retur the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
|
||||||
* the references included in the `currentShardAllocations` parameter
|
* the references included in the `currentShardAllocations` parameter
|
||||||
*/
|
*/
|
||||||
def allocateShard(requester: ActorRef, shardId: String,
|
def allocateShard(requester: ActorRef, shardId: String,
|
||||||
|
|
|
||||||
|
|
@ -501,8 +501,8 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension
|
||||||
case "broadcast" ⇒ BroadcastRoutingLogic()
|
case "broadcast" ⇒ BroadcastRoutingLogic()
|
||||||
case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]")
|
case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]")
|
||||||
}
|
}
|
||||||
val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS)
|
val gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis
|
||||||
val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS)
|
val removedTimeToLive = config.getDuration("removed-time-to-live", MILLISECONDS).millis
|
||||||
val maxDeltaElements = config.getInt("max-delta-elements")
|
val maxDeltaElements = config.getInt("max-delta-elements")
|
||||||
val name = config.getString("name")
|
val name = config.getString("name")
|
||||||
system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements),
|
system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements),
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ case object Update {
|
||||||
* view can also run on a different node, provided that a replicated journal is used. Implementation
|
* view can also run on a different node, provided that a replicated journal is used. Implementation
|
||||||
* classes reference a processor by implementing `processorId`.
|
* classes reference a processor by implementing `processorId`.
|
||||||
*
|
*
|
||||||
* Views can also store snapshots of internal state by calling [[saveSnapshot]]. The snapshots of a view
|
* Views can also store snapshots of internal state by calling [[#saveSnapshot]]. The snapshots of a view
|
||||||
* are independent of those of the referenced processor. During recovery, a saved snapshot is offered
|
* are independent of those of the referenced processor. During recovery, a saved snapshot is offered
|
||||||
* to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger
|
* to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger
|
||||||
* than the snapshot. Default is to offer the latest saved snapshot.
|
* than the snapshot. Default is to offer the latest saved snapshot.
|
||||||
|
|
@ -63,9 +63,8 @@ case object Update {
|
||||||
* `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional
|
* `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional
|
||||||
* view updates by sending the view [[Update]] requests. See also methods
|
* view updates by sending the view [[Update]] requests. See also methods
|
||||||
*
|
*
|
||||||
* - [[autoUpdate]] for turning automated updates on or off
|
* - [[#autoUpdate]] for turning automated updates on or off
|
||||||
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle and
|
* - [[#autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
|
||||||
* - [[autoRecoveryReplayMax]] for limiting the number of replayed messages on initial view recovery
|
|
||||||
*
|
*
|
||||||
* Views can also use channels to communicate with destinations in the same way as processors can do.
|
* Views can also use channels to communicate with destinations in the same way as processors can do.
|
||||||
*/
|
*/
|
||||||
|
|
@ -174,7 +173,7 @@ trait View extends Actor with Recovery {
|
||||||
viewSettings.autoUpdateReplayMax
|
viewSettings.autoUpdateReplayMax
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoRecoveryReplayMax`
|
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
|
||||||
* messages (following that snapshot).
|
* messages (following that snapshot).
|
||||||
*/
|
*/
|
||||||
override def preStart(): Unit = {
|
override def preStart(): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ final class RemoteSettings(val config: Config) {
|
||||||
} requiring (_ > 0, "system-message-buffer-size must be > 0")
|
} requiring (_ > 0, "system-message-buffer-size must be > 0")
|
||||||
|
|
||||||
val QuarantineDuration: FiniteDuration = {
|
val QuarantineDuration: FiniteDuration = {
|
||||||
Duration(getMilliseconds("akka.remote.prune-quarantine-marker-after"), MILLISECONDS).requiring(_ > Duration.Zero,
|
config.getMillisDuration("akka.remote.prune-quarantine-marker-after").requiring(_ > Duration.Zero,
|
||||||
"prune-quarantine-marker-after must be > 0 ms")
|
"prune-quarantine-marker-after must be > 0 ms")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -221,6 +221,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
||||||
protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode
|
protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode
|
||||||
|
|
||||||
// Not used anywhere only to keep compatibility with RemoteTransport interface
|
// Not used anywhere only to keep compatibility with RemoteTransport interface
|
||||||
|
@deprecated("Use the LogRemoteLifecycleEvents setting instead.", "2.3")
|
||||||
protected def logRemoteLifeCycleEvents: Boolean = LogRemoteLifecycleEvents
|
protected def logRemoteLifeCycleEvents: Boolean = LogRemoteLifecycleEvents
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -141,9 +141,8 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS
|
||||||
// Write once variable initialized when Listen is called.
|
// Write once variable initialized when Listen is called.
|
||||||
@volatile protected var manager: ActorRef = _
|
@volatile protected var manager: ActorRef = _
|
||||||
|
|
||||||
// FIXME #3074 how to replace actorFor here?
|
|
||||||
private def registerManager(): Future[ActorRef] =
|
private def registerManager(): Future[ActorRef] =
|
||||||
(system.actorFor("/system/transports") ? RegisterTransportActor(managerProps, managerName)).mapTo[ActorRef]
|
(system.actorSelection("/system/transports") ? RegisterTransportActor(managerProps, managerName)).mapTo[ActorRef]
|
||||||
|
|
||||||
override def interceptListen(listenAddress: Address,
|
override def interceptListen(listenAddress: Address,
|
||||||
listenerPromise: Future[AssociationEventListener]): Future[AssociationEventListener] = {
|
listenerPromise: Future[AssociationEventListener]): Future[AssociationEventListener] = {
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ class DaemonicSpec extends AkkaSpec {
|
||||||
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|
akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|
||||||
akka.remote.netty.tcp.port = 0
|
akka.remote.netty.tcp.port = 0
|
||||||
|
akka.log-dead-letters-during-shutdown = off
|
||||||
"""))
|
"""))
|
||||||
|
|
||||||
val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort))
|
val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort))
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,8 @@ akka {
|
||||||
shutdown(other)
|
shutdown(other)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def expectedTestDuration: FiniteDuration = 90.seconds
|
||||||
|
|
||||||
"receive Terminated when watched node is unknown host" in {
|
"receive Terminated when watched node is unknown host" in {
|
||||||
val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject"
|
val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject"
|
||||||
system.actorOf(Props(new Actor {
|
system.actorOf(Props(new Actor {
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,7 @@ object RemoteWatcherSpec {
|
||||||
class RemoteWatcherSpec extends AkkaSpec(
|
class RemoteWatcherSpec extends AkkaSpec(
|
||||||
"""akka {
|
"""akka {
|
||||||
loglevel = INFO
|
loglevel = INFO
|
||||||
|
log-dead-letters-during-shutdown = false
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
remote.netty.tcp {
|
remote.netty.tcp {
|
||||||
hostname = localhost
|
hostname = localhost
|
||||||
|
|
|
||||||
|
|
@ -59,6 +59,8 @@ object ThrottlerTransportAdapterSpec {
|
||||||
if (received >= MessageCount) controller ! (System.nanoTime() - startTime)
|
if (received >= MessageCount) controller ! (System.nanoTime() - startTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case class Lost(msg: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
|
class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
|
||||||
|
|
@ -96,12 +98,15 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
|
||||||
}
|
}
|
||||||
|
|
||||||
"survive blackholing" taggedAs TimingTest in {
|
"survive blackholing" taggedAs TimingTest in {
|
||||||
here ! "Blackhole 1"
|
here ! Lost("Blackhole 1")
|
||||||
expectMsg("Blackhole 1")
|
expectMsg(Lost("Blackhole 1"))
|
||||||
|
|
||||||
|
muteDeadLetters(classOf[Lost])(system)
|
||||||
|
muteDeadLetters(classOf[Lost])(systemB)
|
||||||
|
|
||||||
throttle(Direction.Both, Blackhole) should be(true)
|
throttle(Direction.Both, Blackhole) should be(true)
|
||||||
|
|
||||||
here ! "Blackhole 2"
|
here ! Lost("Blackhole 2")
|
||||||
expectNoMsg(1.seconds)
|
expectNoMsg(1.seconds)
|
||||||
disassociate() should be(true)
|
disassociate() should be(true)
|
||||||
expectNoMsg(1.seconds)
|
expectNoMsg(1.seconds)
|
||||||
|
|
@ -110,20 +115,20 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
|
||||||
|
|
||||||
// after we remove the Blackhole we can't be certain of the state
|
// after we remove the Blackhole we can't be certain of the state
|
||||||
// of the connection, repeat until success
|
// of the connection, repeat until success
|
||||||
here ! "Blackhole 3"
|
here ! Lost("Blackhole 3")
|
||||||
awaitCond({
|
awaitCond({
|
||||||
if (receiveOne(Duration.Zero) == "Blackhole 3")
|
if (receiveOne(Duration.Zero) == Lost("Blackhole 3"))
|
||||||
true
|
true
|
||||||
else {
|
else {
|
||||||
here ! "Blackhole 3"
|
here ! Lost("Blackhole 3")
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}, 15.seconds)
|
}, 15.seconds)
|
||||||
|
|
||||||
here ! "Cleanup"
|
here ! "Cleanup"
|
||||||
fishForMessage(5.seconds) {
|
fishForMessage(5.seconds) {
|
||||||
case "Cleanup" ⇒ true
|
case "Cleanup" ⇒ true
|
||||||
case "Blackhole 3" ⇒ false
|
case Lost("Blackhole 3") ⇒ false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -66,24 +66,24 @@ class TestFSMRef[S, D, T <: Actor](
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Proxy for [[FSM#setTimer]].
|
* Proxy for [[akka.actor.FSM#setTimer]].
|
||||||
*/
|
*/
|
||||||
def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean = false) {
|
def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean = false) {
|
||||||
fsm.setTimer(name, msg, timeout, repeat)
|
fsm.setTimer(name, msg, timeout, repeat)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Proxy for [[FSM#cancelTimer]].
|
* Proxy for [[akka.actor.FSM#cancelTimer]].
|
||||||
*/
|
*/
|
||||||
def cancelTimer(name: String) { fsm.cancelTimer(name) }
|
def cancelTimer(name: String) { fsm.cancelTimer(name) }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Proxy for [[FSM#isStateTimerActive]].
|
* Proxy for [[akka.actor.FSM#isStateTimerActive]].
|
||||||
*/
|
*/
|
||||||
def isTimerActive(name: String) = fsm.isTimerActive(name)
|
def isTimerActive(name: String) = fsm.isTimerActive(name)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Proxy for [[FSM#isStateTimerActive]].
|
* Proxy for [[akka.actor.FSM#isStateTimerActive]].
|
||||||
*/
|
*/
|
||||||
def isStateTimerActive = fsm.isStateTimerActive
|
def isStateTimerActive = fsm.isStateTimerActive
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue