diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7809717ad3..c1459cac83 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -254,7 +254,13 @@ In order to speed up PR validation times, the Akka build contains a special sbt which is smart enough to figure out which projects should be built if a PR only has changes in some parts of the project. For example, if your PR only touches `akka-persistence`, no `akka-remote` tests need to be run, however the task will validate all projects that depend on `akka-persistence` (including samples). -Also, tests tagged as `PerformanceTest` and the likes of it are excluded from PR validation. +Also, tests tagged as `PerformanceTest`, `TimingTest`, `LongRunningTest` and all multi-node tests are excluded from PR validation. + +You can exclude the same kind of tests in your local build by starting sbt with: + +``` +sbt -Dakka.test.tags.exclude=performance,timing,long-running -Dakka.test.multi-in-test=false +``` In order to force the `validatePullRequest` task to build the entire project, regardless of dependency analysis of a PRs changes one can use the special `PLS BUILD ALL` command (typed in a comment on Github, on the Pull Request), which will cause diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala index cae17727fe..122003f695 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala @@ -11,9 +11,27 @@ import akka.testkit.metrics._ import org.scalatest.BeforeAndAfterAll import akka.testkit.metrics.HeapMemoryUsage import com.codahale.metrics.{ Histogram } +import com.typesafe.config.ConfigFactory object ActorCreationPerfSpec { + val config = ConfigFactory.parseString(""" + akka.test.actor.ActorPerfSpec { + warmUp = 5 + numberOfActors = 10 + numberOfRepeats = 1 + force-gc = off + report-metrics = off + # For serious measurements use something like the following values + #warmUp = 50000 + #numberOfActors = 100000 + #numberOfRepeats = 3 + #force-gc = on + #report-metrics = on + } + akka.actor.serialize-messages = off + """) + final case class Create(number: Int, props: () ⇒ Props) case object Created case object IsAlive @@ -98,7 +116,7 @@ object ActorCreationPerfSpec { } } -class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender +class ActorCreationPerfSpec extends AkkaSpec(ActorCreationPerfSpec.config) with ImplicitSender with MetricsKit with BeforeAndAfterAll { import ActorCreationPerfSpec._ @@ -108,9 +126,11 @@ class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = of val BlockingTimeKey = ActorCreationKey / "synchronous-part" val TotalTimeKey = ActorCreationKey / "total" - val warmUp: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.warmUp", 50000) - val nrOfActors: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfActors", 100000) - val nrOfRepeats: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfRepeats", 3) + val warmUp = metricsConfig.getInt("akka.test.actor.ActorPerfSpec.warmUp") + val nrOfActors = metricsConfig.getInt("akka.test.actor.ActorPerfSpec.numberOfActors") + val nrOfRepeats = metricsConfig.getInt("akka.test.actor.ActorPerfSpec.numberOfRepeats") + override val reportMetricsEnabled = metricsConfig.getBoolean("akka.test.actor.ActorPerfSpec.report-metrics") + override val forceGcEnabled = metricsConfig.getBoolean("akka.test.actor.ActorPerfSpec.force-gc") def runWithCounterInside(metricName: String, scenarioName: String, number: Int, propsCreator: () ⇒ Props) { val hist = histogram(BlockingTimeKey / metricName) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 2a8c425d5e..22f278c18e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -12,6 +12,7 @@ import akka.event.Logging.Warning import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import java.util.concurrent.TimeoutException +import akka.testkit.TimingTest class ActorDSLDummy { //#import @@ -79,7 +80,7 @@ class ActorDSLSpec extends AkkaSpec { i.receive() should ===("hello") } - "have a maximum queue size" in { + "have a maximum queue size" taggedAs TimingTest in { val i = inbox() system.eventStream.subscribe(testActor, classOf[Warning]) try { @@ -101,7 +102,7 @@ class ActorDSLSpec extends AkkaSpec { } } - "have a default and custom timeouts" in { + "have a default and custom timeouts" taggedAs TimingTest in { val i = inbox() within(5 seconds, 6 seconds) { intercept[TimeoutException](i.receive()) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala index f6f2fdbada..3011f872c6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala @@ -35,7 +35,7 @@ object ActorSelectionSpec { } -class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTimeout { +class ActorSelectionSpec extends AkkaSpec with DefaultTimeout { import ActorSelectionSpec._ val c1 = system.actorOf(p, "c1") @@ -298,7 +298,7 @@ class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTim case `c2` ⇒ lastSender } actors should ===(Seq(c21)) - expectNoMsg(1 second) + expectNoMsg(200.millis) } "resolve one actor with explicit timeout" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 0fe9f27bb0..c21abb7e59 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -339,18 +339,13 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend val system2 = ActorSystem(name = "default", config = Some(config), defaultExecutionContext = Some(ec)) try { - val ref = system2.actorOf(Props(new Actor { - def receive = { - case "ping" ⇒ sender() ! "pong" - } - })) - + val ref = system2.actorOf(TestActors.echoActorProps) val probe = TestProbe() ref.tell("ping", probe.ref) - ecProbe.expectNoMsg() - probe.expectMsg(1.second, "pong") + ecProbe.expectNoMsg(200.millis) + probe.expectMsg(1.second, "ping") } finally { shutdown(system2) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala index 691e3002d7..b063d17cd4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala @@ -60,7 +60,7 @@ class ConsistencySpec extends AkkaSpec(ConsistencySpec.config) { val props = Props[ConsistencyCheckingActor].withDispatcher("consistency-dispatcher") val actors = Vector.fill(noOfActors)(system.actorOf(props)) - for (i ← 0L until 100000L) { + for (i ← 0L until 10000L) { actors.foreach(_.tell(i, testActor)) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index d97da96b9f..b5e41320f8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -337,7 +337,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" → true)) with I expectMsg(Transition(fsmref, 0, 1)) } - "allow cancelling stateTimeout by issuing forMax(Duration.Inf)" in { + "allow cancelling stateTimeout by issuing forMax(Duration.Inf)" taggedAs TimingTest in { val sys = ActorSystem("fsmEvent") val p = TestProbe()(sys) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 2ada2abda8..f58dc4cf43 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -21,7 +21,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { "An actor with receive timeout" must { - "get timeout" in { + "get timeout" taggedAs TimingTest in { val timeoutLatch = TestLatch() val timeoutActor = system.actorOf(Props(new Actor { @@ -36,7 +36,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { system.stop(timeoutActor) } - "reschedule timeout after regular receive" in { + "reschedule timeout after regular receive" taggedAs TimingTest in { val timeoutLatch = TestLatch() val timeoutActor = system.actorOf(Props(new Actor { @@ -54,7 +54,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { system.stop(timeoutActor) } - "be able to turn off timeout if desired" in { + "be able to turn off timeout if desired" taggedAs TimingTest in { val count = new AtomicInteger(0) val timeoutLatch = TestLatch() @@ -77,7 +77,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { system.stop(timeoutActor) } - "not receive timeout message when not specified" in { + "not receive timeout message when not specified" taggedAs TimingTest in { val timeoutLatch = TestLatch() val timeoutActor = system.actorOf(Props(new Actor { @@ -90,7 +90,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { system.stop(timeoutActor) } - "get timeout while receiving NotInfluenceReceiveTimeout messages" in { + "get timeout while receiving NotInfluenceReceiveTimeout messages" taggedAs TimingTest in { val timeoutLatch = TestLatch() val timeoutActor = system.actorOf(Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index bdeea269a9..ed1dbfa942 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -152,7 +152,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit ticks.get should ===(1) } - "be canceled if cancel is performed before execution" in { + "be canceled if cancel is performed before execution" taggedAs TimingTest in { val task = collectCancellable(system.scheduler.scheduleOnce(10 seconds)(())) task.cancel() should ===(true) task.isCancelled should ===(true) @@ -160,7 +160,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit task.isCancelled should ===(true) } - "not be canceled if cancel is performed after execution" in { + "not be canceled if cancel is performed after execution" taggedAs TimingTest in { val latch = TestLatch(1) val task = collectCancellable(system.scheduler.scheduleOnce(10 millis)(latch.countDown())) Await.ready(latch, remainingOrDefault) @@ -312,7 +312,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev "A LightArrayRevolverScheduler" must { - "reject tasks scheduled too far into the future" in { + "reject tasks scheduled too far into the future" taggedAs TimingTest in { val maxDelay = tickDuration * Int.MaxValue import system.dispatcher system.scheduler.scheduleOnce(maxDelay, testActor, "OK") @@ -321,7 +321,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "reject periodic tasks scheduled too far into the future" in { + "reject periodic tasks scheduled too far into the future" taggedAs TimingTest in { val maxDelay = tickDuration * Int.MaxValue import system.dispatcher system.scheduler.schedule(maxDelay, 1.second, testActor, "OK") @@ -330,7 +330,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "reject periodic tasks scheduled with too long interval" in { + "reject periodic tasks scheduled with too long interval" taggedAs TimingTest in { val maxDelay = tickDuration * Int.MaxValue import system.dispatcher system.scheduler.schedule(100.millis, maxDelay, testActor, "OK") @@ -373,7 +373,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev expectNoMsg(1.second) } - "survive vicious enqueueing" in { + "survive vicious enqueueing" taggedAs TimingTest in { withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver) ⇒ import driver._ import system.dispatcher @@ -396,7 +396,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "execute multiple jobs at once when expiring multiple buckets" in { + "execute multiple jobs at once when expiring multiple buckets" taggedAs TimingTest in { withScheduler() { (sched, driver) ⇒ implicit def ec = localEC import driver._ @@ -411,7 +411,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "properly defer jobs even when the timer thread oversleeps" in { + "properly defer jobs even when the timer thread oversleeps" taggedAs TimingTest in { withScheduler() { (sched, driver) ⇒ implicit def ec = localEC import driver._ @@ -426,7 +426,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "correctly wrap around wheel rounds" in { + "correctly wrap around wheel rounds" taggedAs TimingTest in { withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver) ⇒ implicit def ec = localEC import driver._ @@ -453,7 +453,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "correctly execute jobs when clock wraps around" in { + "correctly execute jobs when clock wraps around" taggedAs TimingTest in { withScheduler(Long.MaxValue - 200000000L) { (sched, driver) ⇒ implicit def ec = localEC import driver._ @@ -480,7 +480,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "correctly wrap around ticks" in { + "correctly wrap around ticks" taggedAs TimingTest in { val numEvents = 40 val targetTicks = Int.MaxValue - numEvents + 20 @@ -507,7 +507,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev } } - "reliably reject jobs when shutting down" in { + "reliably reject jobs when shutting down" taggedAs TimingTest in { withScheduler() { (sched, driver) ⇒ import system.dispatcher val counter = new AtomicInteger diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 65586df6e2..bb4cda124e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -24,6 +24,7 @@ import akka.event.Logging import java.util.concurrent.atomic.AtomicInteger import java.lang.System.identityHashCode import akka.util.Helpers.ConfigOps +import akka.testkit.LongRunningTest object SupervisorHierarchySpec { import FSM.`→` @@ -728,7 +729,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w "A Supervisor Hierarchy" must { - "restart manager and workers in AllForOne" in { + "restart manager and workers in AllForOne" taggedAs LongRunningTest in { val countDown = new CountDownLatch(4) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Exception]))))) @@ -749,7 +750,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w } } - "send notification to supervisor when permanent failure" in { + "send notification to supervisor when permanent failure" taggedAs LongRunningTest in { val countDownMessages = new CountDownLatch(1) val countDownMax = new CountDownLatch(1) val boss = system.actorOf(Props(new Actor { @@ -773,7 +774,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w } } - "resume children after Resume" in { + "resume children after Resume" taggedAs LongRunningTest in { val boss = system.actorOf(Props[Resumer], "resumer") boss ! "spawn" val middle = expectMsgType[ActorRef] @@ -790,7 +791,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w expectMsg("pong") } - "suspend children while failing" in { + "suspend children while failing" taggedAs LongRunningTest in { val latch = TestLatch() val slowResumer = system.actorOf(Props(new Actor { override def supervisorStrategy = OneForOneStrategy() { case _ ⇒ Await.ready(latch, 4.seconds.dilated); SupervisorStrategy.Resume } @@ -816,7 +817,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w expectMsg("pong") } - "handle failure in creation when supervision startegy returns Resume and Restart" in { + "handle failure in creation when supervision startegy returns Resume and Restart" taggedAs LongRunningTest in { val createAttempt = new AtomicInteger(0) val preStartCalled = new AtomicInteger(0) val postRestartCalled = new AtomicInteger(0) @@ -866,7 +867,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w postRestartCalled.get should ===(0) } - "survive being stressed" in { + "survive being stressed" taggedAs LongRunningTest in { system.eventStream.publish(Mute( EventFilter[Failure](), EventFilter.warning("Failure"), diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala index e26e2e5ba9..633d450843 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -80,7 +80,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC } "A cluster router with a consistent hashing pool" must { - "start cluster with 2 nodes" taggedAs LongRunningTest in { + "start cluster with 2 nodes" in { awaitClusterUp(first, second) enterBarrier("after-1") } @@ -105,7 +105,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-2") } - "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in { + "deploy routees to new member nodes in the cluster" in { awaitClusterUp(first, second, third) @@ -119,7 +119,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-3") } - "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { + "deploy programatically defined routees to the member nodes in the cluster" in { runOn(first) { val router2 = system.actorOf( ClusterRouterPool( @@ -136,7 +136,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-4") } - "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in { + "handle combination of configured router and programatically defined hashMapping" in { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s @@ -150,7 +150,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC enterBarrier("after-5") } - "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in { + "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" in { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s diff --git a/akka-cluster/src/test/scala/akka/cluster/AutoDownSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AutoDownSpec.scala index f0baa013f1..4355dd6b91 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AutoDownSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AutoDownSpec.scala @@ -13,6 +13,7 @@ import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.remote.RARP import akka.testkit.AkkaSpec +import akka.testkit.TimingTest object AutoDownSpec { final case class DownCalled(address: Address) @@ -92,7 +93,7 @@ class AutoDownSpec extends AkkaSpec("akka.actor.provider=remote") { expectMsg(DownCalled(memberC.address)) } - "not down unreachable when losing leadership inbetween detection and specified duration" in { + "not down unreachable when losing leadership inbetween detection and specified duration" taggedAs TimingTest in { val a = autoDownActor(2.seconds) a ! LeaderChanged(Some(memberA.address)) a ! UnreachableMember(memberC) @@ -100,7 +101,7 @@ class AutoDownSpec extends AkkaSpec("akka.actor.provider=remote") { expectNoMsg(3.second) } - "not down when unreachable become reachable inbetween detection and specified duration" in { + "not down when unreachable become reachable inbetween detection and specified duration" taggedAs TimingTest in { val a = autoDownActor(2.seconds) a ! LeaderChanged(Some(memberA.address)) a ! UnreachableMember(memberB) @@ -108,7 +109,7 @@ class AutoDownSpec extends AkkaSpec("akka.actor.provider=remote") { expectNoMsg(3.second) } - "not down when unreachable is removed inbetween detection and specified duration" in { + "not down when unreachable is removed inbetween detection and specified duration" taggedAs TimingTest in { val a = autoDownActor(2.seconds) a ! LeaderChanged(Some(memberA.address)) a ! UnreachableMember(memberB) diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala index fe4c65b787..e06ace7df0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala @@ -10,7 +10,8 @@ import akka.actor.Address class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers { val nodesSize = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.nodesSize").getOrElse("250").toInt - val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("10000").toInt + // increase for serious measurements + val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("1000").toInt def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = { val nodes = (1 to size).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala index ff427d8296..68ba692848 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala @@ -10,7 +10,8 @@ import akka.actor.Address class ReachabilityPerfSpec extends WordSpec with Matchers { val nodesSize = sys.props.get("akka.cluster.ReachabilityPerfSpec.nodesSize").getOrElse("250").toInt - val iterations = sys.props.get("akka.cluster.ReachabilityPerfSpec.iterations").getOrElse("10000").toInt + // increase for serious measurements + val iterations = sys.props.get("akka.cluster.ReachabilityPerfSpec.iterations").getOrElse("100").toInt val address = Address("akka.tcp", "sys", "a", 2552) val node = Address("akka.tcp", "sys", "a", 2552) diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala index 421c708954..80d5db96f4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala @@ -32,7 +32,8 @@ class VectorClockPerfSpec extends WordSpec with Matchers { import VectorClockPerfSpec._ val clockSize = sys.props.get("akka.cluster.VectorClockPerfSpec.clockSize").getOrElse("1000").toInt - val iterations = sys.props.get("akka.cluster.VectorClockPerfSpec.iterations").getOrElse("10000").toInt + // increase for serious measurements + val iterations = sys.props.get("akka.cluster.VectorClockPerfSpec.iterations").getOrElse("1000").toInt val (vcBefore, nodes) = createVectorClockOfSize(clockSize) val firstNode = nodes.head diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index d9b69fa2ad..05103e2967 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -62,7 +62,7 @@ object PersistenceDocSpec { trait MyPersistentActor5 extends PersistentActor { //#recovery-no-snap - override def recovery = + override def recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None) //#recovery-no-snap } diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index 43756e8139..8f7405b809 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -11,11 +11,11 @@ import akka.actor._ import akka.testkit._ object PerformanceSpec { - // multiply cycles with 200 for more - // accurate throughput measurements val config = """ - akka.persistence.performance.cycles.load = 1000 + akka.persistence.performance.cycles.load = 100 + # more accurate throughput measurements + #akka.persistence.performance.cycles.load = 200000 """ case object StopMeasure diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 6be790e272..3bb1bc2c4c 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -161,7 +161,7 @@ class RemoteWatcherSpec extends AkkaSpec( expectNoMsg(2 seconds) } - "generate AddressTerminated when missing heartbeats" in { + "generate AddressTerminated when missing heartbeats" taggedAs LongRunningTest in { val p = TestProbe() val q = TestProbe() system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) @@ -198,7 +198,7 @@ class RemoteWatcherSpec extends AkkaSpec( expectNoMsg(2 seconds) } - "generate AddressTerminated when missing first heartbeat" in { + "generate AddressTerminated when missing first heartbeat" taggedAs LongRunningTest in { val p = TestProbe() val q = TestProbe() system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) @@ -234,7 +234,7 @@ class RemoteWatcherSpec extends AkkaSpec( expectNoMsg(2 seconds) } - "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in { + "generate AddressTerminated for new watch after broken connection that was re-established and broken again" taggedAs LongRunningTest in { val p = TestProbe() val q = TestProbe() system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala index dc236b272f..f16edcd9c5 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala @@ -161,7 +161,7 @@ class RemoteWatcherSpec extends AkkaSpec( expectNoMsg(2 seconds) } - "generate AddressTerminated when missing heartbeats" in { + "generate AddressTerminated when missing heartbeats" taggedAs LongRunningTest in { val p = TestProbe() val q = TestProbe() system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) @@ -198,7 +198,7 @@ class RemoteWatcherSpec extends AkkaSpec( expectNoMsg(2 seconds) } - "generate AddressTerminated when missing first heartbeat" in { + "generate AddressTerminated when missing first heartbeat" taggedAs LongRunningTest in { val p = TestProbe() val q = TestProbe() system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) @@ -234,7 +234,7 @@ class RemoteWatcherSpec extends AkkaSpec( expectNoMsg(2 seconds) } - "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in { + "generate AddressTerminated for new watch after broken connection that was re-established and broken again" taggedAs LongRunningTest in { val p = TestProbe() val q = TestProbe() system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index cebf7e8cc0..c5325eb1f4 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -177,7 +177,7 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String) override def beforeTermination() { system.eventStream.publish(TestEvent.Mute( - EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"), + EventFilter.warning(source = s"akka://AkkaProtocolStressTest/user/$$a", start = "received dead letter"), EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) systemB.eventStream.publish(TestEvent.Mute( EventFilter[EndpointException](), diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index ff7895fe71..93ef2cea37 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -6,6 +6,7 @@ package akka.stream.impl.fusing import akka.stream.impl.ConstantFun import akka.stream.Supervision import akka.stream.testkit.StreamSpec +import akka.testkit.LongRunningTest class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit { import Supervision.stoppingDecider @@ -23,7 +24,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit { "Interpreter" must { - "work with a massive chain of maps" in new OneBoundedSetup[Int](Vector.fill(chainLength)(map): _*) { + "work with a massive chain of maps" taggedAs LongRunningTest in new OneBoundedSetup[Int](Vector.fill(chainLength)(map): _*) { lastEvents() should be(Set.empty) val tstamp = System.nanoTime() @@ -45,7 +46,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit { info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") } - "work with a massive chain of maps with early complete" in new OneBoundedSetup[Int]( + "work with a massive chain of maps with early complete" taggedAs LongRunningTest in new OneBoundedSetup[Int]( Vector.fill(halfLength)(map) ++ Seq(takeHalfOfRepetition) ++ Vector.fill(halfLength)(map): _*) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 6fe562b201..88ac93071d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import scala.concurrent.duration._ import akka.actor.{ Actor, ActorRef, Props } import akka.stream.ActorMaterializer import akka.stream.Attributes.inputBuffer @@ -100,7 +101,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { expectMsg(initMessage) publisher.sendNext(1) - expectNoMsg() + expectNoMsg(200.millis) fw ! TriggerAckMessage expectMsg(1) @@ -150,7 +151,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { expectMsg(1) fw ! TriggerAckMessage - expectNoMsg() // Ack received but buffer empty + expectNoMsg(200.millis) // Ack received but buffer empty publisher.sendNext(2) // Buffer this value fw ! TriggerAckMessage diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index adcc979174..fda2b7443f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -14,6 +14,7 @@ import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.stream.ThrottleMode +import akka.testkit.TimingTest class FlowDelaySpec extends StreamSpec { @@ -21,13 +22,13 @@ class FlowDelaySpec extends StreamSpec { "A Delay" must { - "deliver elements with some time shift" in { + "deliver elements with some time shift" taggedAs TimingTest in { Await.result( Source(1 to 10).delay(1.seconds).grouped(100).runWith(Sink.head), 1200.millis) should ===(1 to 10) } - "add delay to initialDelay if exists upstream" in { + "add delay to initialDelay if exists upstream" taggedAs TimingTest in { Source(1 to 10).initialDelay(1.second).delay(1.second).runWith(TestSink.probe[Int]) .request(10) .expectNoMsg(1800.millis) @@ -127,7 +128,7 @@ class FlowDelaySpec extends StreamSpec { pSub.sendError(new RuntimeException() with NoStackTrace) } - "properly delay according to buffer size" in { + "properly delay according to buffer size" taggedAs TimingTest in { import akka.pattern.pipe import system.dispatcher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala index 3c8e1fdee1..4d2b524e0f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala @@ -16,6 +16,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import scala.concurrent.duration._ import scala.concurrent.{ Await, Future } import scala.util.control.NoStackTrace +import akka.testkit.LongRunningTest class FlowFoldAsyncSpec extends StreamSpec { implicit val materializer = ActorMaterializer() @@ -43,7 +44,7 @@ class FlowFoldAsyncSpec extends StreamSpec { inputSource.runWith(foldSink).futureValue(timeout) should ===(expected) } - "work when using Flow.foldAsync" in assertAllStagesStopped { + "work when using Flow.foldAsync" taggedAs LongRunningTest in assertAllStagesStopped { val flowTimeout = Timeout((flowDelayMS * input.size).milliseconds + 3.seconds) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala index 52bb3549d7..87dfc92e4d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -11,6 +11,7 @@ import akka.stream.testkit._ import akka.stream.testkit.Utils._ import scala.concurrent.Await +import akka.testkit.TimingTest class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { @@ -20,7 +21,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { "A GroupedWithin" must { - "group elements within the duration" in assertAllStagesStopped { + "group elements within the duration" taggedAs TimingTest in assertAllStagesStopped { val input = Iterator.from(1) val p = TestPublisher.manualProbe[Int]() val c = TestSubscriber.manualProbe[immutable.Seq[Int]]() @@ -47,7 +48,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { c.expectNoMsg(200.millis) } - "deliver bufferd elements onComplete before the timeout" in { + "deliver bufferd elements onComplete before the timeout" taggedAs TimingTest in { val c = TestSubscriber.manualProbe[immutable.Seq[Int]]() Source(1 to 3).groupedWithin(1000, 10.second).to(Sink.fromSubscriber(c)).run() val cSub = c.expectSubscription @@ -57,7 +58,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { c.expectNoMsg(200.millis) } - "buffer groups until requested from downstream" in { + "buffer groups until requested from downstream" taggedAs TimingTest in { val input = Iterator.from(1) val p = TestPublisher.manualProbe[Int]() val c = TestSubscriber.manualProbe[immutable.Seq[Int]]() @@ -78,7 +79,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { c.expectNoMsg(100.millis) } - "drop empty groups" in { + "drop empty groups" taggedAs TimingTest in { val p = TestPublisher.manualProbe[Int]() val c = TestSubscriber.manualProbe[immutable.Seq[Int]]() Source.fromPublisher(p).groupedWithin(1000, 500.millis).to(Sink.fromSubscriber(c)).run() @@ -99,7 +100,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { c.expectNoMsg(100.millis) } - "reset time window when max elements reached" in { + "reset time window when max elements reached" taggedAs TimingTest in { val inputs = Iterator.from(1) val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[immutable.Seq[Int]]() @@ -124,18 +125,18 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { downstream.expectNoMsg(100.millis) } - "group evenly" in { + "group evenly" taggedAs TimingTest in { def script = Script(TestConfig.RandomTestRange map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) → Seq(immutable.Seq(x, y, z)) }: _*) TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) } - "group with rest" in { + "group with rest" taggedAs TimingTest in { def script = Script((TestConfig.RandomTestRange.map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) → Seq(immutable.Seq(x, y, z)) } :+ { val x = random.nextInt(); Seq(x) → Seq(immutable.Seq(x)) }): _*) TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.groupedWithin(3, 10.minutes))) } - "group with small groups with backpressure" in { + "group with small groups with backpressure" taggedAs TimingTest in { Source(1 to 10) .groupedWithin(1, 1.day) .throttle(1, 110.millis, 0, ThrottleMode.Shaping) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala index 2b728f1204..2e432fe17a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import scala.concurrent.duration._ import akka.stream.testkit.Utils.TE import akka.stream.testkit.{ TestPublisher, TestSubscriber } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } @@ -102,7 +103,7 @@ class FlowOrElseSpec extends AkkaSpec { "complete when both inputs completes without emitting elements, regardless of order" in new OrElseProbedFlow { outProbe.ensureSubscription() inProbe2.sendComplete() - outProbe.expectNoMsg() // make sure it did not complete here + outProbe.expectNoMsg(200.millis) // make sure it did not complete here inProbe1.sendComplete() outProbe.expectComplete() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala index 857bf05b54..8353498a46 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala @@ -16,6 +16,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import scala.collection.immutable import scala.concurrent.duration._ import scala.util.Random +import akka.testkit.LongRunningTest class FramingSpec extends StreamSpec { @@ -70,6 +71,8 @@ class FramingSpec extends StreamSpec { val rechunk = Flow[ByteString].via(new Rechunker).named("rechunker") + override def expectedTestDuration = 2.minutes + "Delimiter bytes based framing" must { val delimiterBytes = List("\n", "\r\n", "FOO").map(ByteString(_)) @@ -85,7 +88,7 @@ class FramingSpec extends StreamSpec { yield delimiter.take(prefix) ++ s "work with various delimiters and test sequences" in { - for (delimiter ← delimiterBytes; _ ← 1 to 100) { + for (delimiter ← delimiterBytes; _ ← 1 to 5) { val testSequence = completeTestSequences(delimiter) val f = Source(testSequence) .map(_ ++ delimiter) @@ -161,9 +164,8 @@ class FramingSpec extends StreamSpec { ByteString(Array.ofDim[Byte](fieldOffset)) ++ header ++ payload } - "work with various byte orders, frame lengths and offsets" in { + "work with various byte orders, frame lengths and offsets" taggedAs LongRunningTest in { for { - _ ← (1 to 10) byteOrder ← byteOrders fieldOffset ← fieldOffsets fieldLength ← fieldLengths @@ -229,7 +231,7 @@ class FramingSpec extends StreamSpec { .failed.futureValue shouldBe a[FramingException] } - "report truncated frames" in { + "report truncated frames" taggedAs LongRunningTest in { for { //_ ← 1 to 10 byteOrder ← byteOrders diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 73531d4f77..d99665da2a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -153,7 +153,7 @@ class QueueSinkSpec extends StreamSpec { expectMsg(Some(1)) queue.pull().pipeTo(testActor) - expectNoMsg() // element requested but buffer empty + expectNoMsg(200.millis) // element requested but buffer empty sub.sendNext(2) expectMsg(Some(2)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala index 157fd54708..d28a1a89e3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -14,12 +14,18 @@ import scala.concurrent._ import akka.Done import akka.stream.testkit.{ StreamSpec, TestSubscriber, TestSourceStage, GraphStageMessages } import akka.stream.testkit.scaladsl.TestSink +import org.scalatest.time.Span class QueueSourceSpec extends StreamSpec { implicit val materializer = ActorMaterializer() implicit val ec = system.dispatcher val pause = 300.millis + // more frequent checks than defaults from AkkaSpec + implicit val testPatience = PatienceConfig( + testKitSettings.DefaultTimeout.duration, + Span(5, org.scalatest.time.Millis)) + def assertSuccess(f: Future[QueueOfferResult]): Unit = { f.futureValue should ===(QueueOfferResult.Enqueued) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 5549bb3704..32ebd05be5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -7,13 +7,14 @@ import scala.concurrent.duration._ import akka.stream.{ ClosedShape, ActorMaterializer } import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.testkit.TimingTest class TickSourceSpec extends StreamSpec { implicit val materializer = ActorMaterializer() "A Flow based on tick publisher" must { - "produce ticks" in assertAllStagesStopped { + "produce ticks" taggedAs TimingTest in assertAllStagesStopped { val c = TestSubscriber.manualProbe[String]() Source.tick(1.second, 1.second, "tick").to(Sink.fromSubscriber(c)).run() val sub = c.expectSubscription() @@ -25,7 +26,7 @@ class TickSourceSpec extends StreamSpec { c.expectNoMsg(200.millis) } - "drop ticks when not requested" in { + "drop ticks when not requested" taggedAs TimingTest in { val c = TestSubscriber.manualProbe[String]() Source.tick(1.second, 1.second, "tick").to(Sink.fromSubscriber(c)).run() val sub = c.expectSubscription() @@ -42,7 +43,7 @@ class TickSourceSpec extends StreamSpec { c.expectNoMsg(200.millis) } - "reject multiple subscribers, but keep the first" in { + "reject multiple subscribers, but keep the first" taggedAs TimingTest in { val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.asPublisher(false)) val c1 = TestSubscriber.manualProbe[String]() val c2 = TestSubscriber.manualProbe[String]() @@ -58,7 +59,7 @@ class TickSourceSpec extends StreamSpec { sub1.cancel() } - "be usable with zip for a simple form of rate limiting" in { + "be usable with zip for a simple form of rate limiting" taggedAs TimingTest in { val c = TestSubscriber.manualProbe[Int]() RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ @@ -79,7 +80,7 @@ class TickSourceSpec extends StreamSpec { sub.cancel() } - "be possible to cancel" in assertAllStagesStopped { + "be possible to cancel" taggedAs TimingTest in assertAllStagesStopped { val c = TestSubscriber.manualProbe[String]() val tickSource = Source.tick(1.second, 1.second, "tick") val cancellable = tickSource.to(Sink.fromSubscriber(c)).run() @@ -95,7 +96,7 @@ class TickSourceSpec extends StreamSpec { c.expectComplete() } - "acknowledge cancellation only once" in assertAllStagesStopped { + "acknowledge cancellation only once" taggedAs TimingTest in assertAllStagesStopped { val c = TestSubscriber.manualProbe[String]() val cancellable = Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run() val sub = c.expectSubscription() @@ -106,7 +107,7 @@ class TickSourceSpec extends StreamSpec { c.expectComplete() } - "have isCancelled mirror the cancellation state" in assertAllStagesStopped { + "have isCancelled mirror the cancellation state" taggedAs TimingTest in assertAllStagesStopped { val c = TestSubscriber.manualProbe[String]() val cancellable = Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run() val sub = c.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 501e1dec44..aae573eb12 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -227,7 +227,7 @@ object Source { * `create` factory is never called and the materialized `CompletionStage` is failed. */ def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] = - scaladsl.Source.lazily[T, M](() => create.create().asScala).mapMaterializedValue(_.toJava).asJava + scaladsl.Source.lazily[T, M](() ⇒ create.create().asScala).mapMaterializedValue(_.toJava).asJava /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index a7adcfb691..34e09ec9f3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -360,7 +360,7 @@ object Source { * the materialized future is completed with its value, if downstream cancels or fails without any demand the * create factory is never called and the materialized `Future` is failed. */ - def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] = + def lazily[T, M](create: () ⇒ Source[T, M]): Source[T, Future[M]] = Source.fromGraph(new LazySource[T, M](create)) /** diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala index 2990f2cc98..3b7cd7a193 100644 --- a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala @@ -83,13 +83,16 @@ private[akka] trait MetricsKit extends MetricsKitOps { clearMetrics() } + def reportMetricsEnabled: Boolean = true + /** * Causes immediate flush of metrics, using all registered reporters. * * HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting. */ def reportMetrics() { - reporters foreach { _.report() } + if (reportMetricsEnabled) + reporters foreach { _.report() } } /** diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitOps.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitOps.scala index 6f46358d72..506a8db75d 100644 --- a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitOps.scala +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitOps.scala @@ -54,10 +54,12 @@ private[akka] trait MetricsKitOps extends MetricKeyDSL { registry.histogram((key / "histogram").toString) } + def forceGcEnabled: Boolean = true + /** Yet another delegate to `System.gc()` */ def gc() { - // todo add some form of logging, to differentiate manual gc calls from "normal" ones - System.gc() + if (forceGcEnabled) + System.gc() } /** diff --git a/project/MultiNode.scala b/project/MultiNode.scala index 6f16c5babd..dad5945232 100644 --- a/project/MultiNode.scala +++ b/project/MultiNode.scala @@ -12,6 +12,11 @@ import sbt._ import sbt.Keys._ object MultiNode extends AutoPlugin { + + // MultiJvm tests can be excluded from normal test target an validatePullRequest + // with -Dakka.test.multi-in-test=false + val multiNodeTestInTest: Boolean = + System.getProperty("akka.test.multi-in-test", "true") == "true" object CliOptions { val multiNode = CliOption("akka.test.multi-node", false) @@ -60,19 +65,21 @@ object MultiNode extends AutoPlugin { CliOptions.hostsFileName.map(multiNodeHostsFileName in MultiJvm := _) ++ CliOptions.javaName.map(multiNodeJavaName in MultiJvm := _) ++ CliOptions.targetDirName.map(multiNodeTargetDirName in MultiJvm := _) ++ - // make sure that MultiJvm tests are executed by the default test target, - // and combine the results from ordinary test and multi-jvm tests - (executeTests in Test <<= (executeTests in Test, multiExecuteTests) map { - case (testResults, multiNodeResults) => - val overall = - if (testResults.overall.id < multiNodeResults.overall.id) - multiNodeResults.overall - else - testResults.overall - Tests.Output(overall, - testResults.events ++ multiNodeResults.events, - testResults.summaries ++ multiNodeResults.summaries) - }) + (if (multiNodeTestInTest) { + // make sure that MultiJvm tests are executed by the default test target, + // and combine the results from ordinary test and multi-jvm tests + (executeTests in Test <<= (executeTests in Test, multiExecuteTests) map { + case (testResults, multiNodeResults) => + val overall = + if (testResults.overall.id < multiNodeResults.overall.id) + multiNodeResults.overall + else + testResults.overall + Tests.Output(overall, + testResults.events ++ multiNodeResults.events, + testResults.summaries ++ multiNodeResults.summaries) + }) + } else Nil) } /** diff --git a/project/ValidatePullRequest.scala b/project/ValidatePullRequest.scala index 5015162392..dcf129d61f 100644 --- a/project/ValidatePullRequest.scala +++ b/project/ValidatePullRequest.scala @@ -264,9 +264,9 @@ object MultiNodeWithPrValidation extends AutoPlugin { override def trigger = allRequirements override def requires = ValidatePullRequest && MultiNode - override lazy val projectSettings = Seq( - additionalTasks in ValidatePR += MultiNode.multiTest - ) + override lazy val projectSettings = + if (MultiNode.multiNodeTestInTest) Seq(additionalTasks in ValidatePR += MultiNode.multiTest) + else Nil } /**