Speedup pull request validation

* speedup ActorCreationPerfSpec
* reduce iterations in ConsistencySpec
* tag SupervisorHierarchySpec as LongRunningTest
* various small speedups and tagging in actor-tests
* speedup expectNoMsg in stream-tests
* tag FramingSpec, and reduce iterations
* speedup QueueSourceSpec
* tag some stream-tests
* reduce iterations in persistence.PerformanceSpec
* reduce iterations in some cluster perf tests
* tag RemoteWatcherSpec
* tag InterpreterStressSpec
* remove LongRunning from ClusterConsistentHashingRouterSpec
* sys property to disable multi-jvm tests in test
* actually disable multi-node tests in validatePullRequest
* doc sbt flags in CONTRIBUTING
This commit is contained in:
Patrik Nordwall 2016-11-29 08:33:36 +01:00
parent 267f31149c
commit e04444567f
36 changed files with 168 additions and 114 deletions

View file

@ -254,7 +254,13 @@ In order to speed up PR validation times, the Akka build contains a special sbt
which is smart enough to figure out which projects should be built if a PR only has changes in some parts of the project.
For example, if your PR only touches `akka-persistence`, no `akka-remote` tests need to be run, however the task
will validate all projects that depend on `akka-persistence` (including samples).
Also, tests tagged as `PerformanceTest` and the likes of it are excluded from PR validation.
Also, tests tagged as `PerformanceTest`, `TimingTest`, `LongRunningTest` and all multi-node tests are excluded from PR validation.
You can exclude the same kind of tests in your local build by starting sbt with:
```
sbt -Dakka.test.tags.exclude=performance,timing,long-running -Dakka.test.multi-in-test=false
```
In order to force the `validatePullRequest` task to build the entire project, regardless of dependency analysis of a PRs
changes one can use the special `PLS BUILD ALL` command (typed in a comment on Github, on the Pull Request), which will cause

View file

@ -11,9 +11,27 @@ import akka.testkit.metrics._
import org.scalatest.BeforeAndAfterAll
import akka.testkit.metrics.HeapMemoryUsage
import com.codahale.metrics.{ Histogram }
import com.typesafe.config.ConfigFactory
object ActorCreationPerfSpec {
val config = ConfigFactory.parseString("""
akka.test.actor.ActorPerfSpec {
warmUp = 5
numberOfActors = 10
numberOfRepeats = 1
force-gc = off
report-metrics = off
# For serious measurements use something like the following values
#warmUp = 50000
#numberOfActors = 100000
#numberOfRepeats = 3
#force-gc = on
#report-metrics = on
}
akka.actor.serialize-messages = off
""")
final case class Create(number: Int, props: () Props)
case object Created
case object IsAlive
@ -98,7 +116,7 @@ object ActorCreationPerfSpec {
}
}
class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender
class ActorCreationPerfSpec extends AkkaSpec(ActorCreationPerfSpec.config) with ImplicitSender
with MetricsKit with BeforeAndAfterAll {
import ActorCreationPerfSpec._
@ -108,9 +126,11 @@ class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = of
val BlockingTimeKey = ActorCreationKey / "synchronous-part"
val TotalTimeKey = ActorCreationKey / "total"
val warmUp: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.warmUp", 50000)
val nrOfActors: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfActors", 100000)
val nrOfRepeats: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfRepeats", 3)
val warmUp = metricsConfig.getInt("akka.test.actor.ActorPerfSpec.warmUp")
val nrOfActors = metricsConfig.getInt("akka.test.actor.ActorPerfSpec.numberOfActors")
val nrOfRepeats = metricsConfig.getInt("akka.test.actor.ActorPerfSpec.numberOfRepeats")
override val reportMetricsEnabled = metricsConfig.getBoolean("akka.test.actor.ActorPerfSpec.report-metrics")
override val forceGcEnabled = metricsConfig.getBoolean("akka.test.actor.ActorPerfSpec.force-gc")
def runWithCounterInside(metricName: String, scenarioName: String, number: Int, propsCreator: () Props) {
val hist = histogram(BlockingTimeKey / metricName)

View file

@ -12,6 +12,7 @@ import akka.event.Logging.Warning
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
import akka.testkit.TimingTest
class ActorDSLDummy {
//#import
@ -79,7 +80,7 @@ class ActorDSLSpec extends AkkaSpec {
i.receive() should ===("hello")
}
"have a maximum queue size" in {
"have a maximum queue size" taggedAs TimingTest in {
val i = inbox()
system.eventStream.subscribe(testActor, classOf[Warning])
try {
@ -101,7 +102,7 @@ class ActorDSLSpec extends AkkaSpec {
}
}
"have a default and custom timeouts" in {
"have a default and custom timeouts" taggedAs TimingTest in {
val i = inbox()
within(5 seconds, 6 seconds) {
intercept[TimeoutException](i.receive())

View file

@ -35,7 +35,7 @@ object ActorSelectionSpec {
}
class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTimeout {
class ActorSelectionSpec extends AkkaSpec with DefaultTimeout {
import ActorSelectionSpec._
val c1 = system.actorOf(p, "c1")
@ -298,7 +298,7 @@ class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTim
case `c2` lastSender
}
actors should ===(Seq(c21))
expectNoMsg(1 second)
expectNoMsg(200.millis)
}
"resolve one actor with explicit timeout" in {

View file

@ -339,18 +339,13 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
val system2 = ActorSystem(name = "default", config = Some(config), defaultExecutionContext = Some(ec))
try {
val ref = system2.actorOf(Props(new Actor {
def receive = {
case "ping" sender() ! "pong"
}
}))
val ref = system2.actorOf(TestActors.echoActorProps)
val probe = TestProbe()
ref.tell("ping", probe.ref)
ecProbe.expectNoMsg()
probe.expectMsg(1.second, "pong")
ecProbe.expectNoMsg(200.millis)
probe.expectMsg(1.second, "ping")
} finally {
shutdown(system2)
}

View file

@ -60,7 +60,7 @@ class ConsistencySpec extends AkkaSpec(ConsistencySpec.config) {
val props = Props[ConsistencyCheckingActor].withDispatcher("consistency-dispatcher")
val actors = Vector.fill(noOfActors)(system.actorOf(props))
for (i 0L until 100000L) {
for (i 0L until 10000L) {
actors.foreach(_.tell(i, testActor))
}

View file

@ -337,7 +337,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" → true)) with I
expectMsg(Transition(fsmref, 0, 1))
}
"allow cancelling stateTimeout by issuing forMax(Duration.Inf)" in {
"allow cancelling stateTimeout by issuing forMax(Duration.Inf)" taggedAs TimingTest in {
val sys = ActorSystem("fsmEvent")
val p = TestProbe()(sys)

View file

@ -21,7 +21,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
"An actor with receive timeout" must {
"get timeout" in {
"get timeout" taggedAs TimingTest in {
val timeoutLatch = TestLatch()
val timeoutActor = system.actorOf(Props(new Actor {
@ -36,7 +36,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
system.stop(timeoutActor)
}
"reschedule timeout after regular receive" in {
"reschedule timeout after regular receive" taggedAs TimingTest in {
val timeoutLatch = TestLatch()
val timeoutActor = system.actorOf(Props(new Actor {
@ -54,7 +54,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
system.stop(timeoutActor)
}
"be able to turn off timeout if desired" in {
"be able to turn off timeout if desired" taggedAs TimingTest in {
val count = new AtomicInteger(0)
val timeoutLatch = TestLatch()
@ -77,7 +77,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
system.stop(timeoutActor)
}
"not receive timeout message when not specified" in {
"not receive timeout message when not specified" taggedAs TimingTest in {
val timeoutLatch = TestLatch()
val timeoutActor = system.actorOf(Props(new Actor {
@ -90,7 +90,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
system.stop(timeoutActor)
}
"get timeout while receiving NotInfluenceReceiveTimeout messages" in {
"get timeout while receiving NotInfluenceReceiveTimeout messages" taggedAs TimingTest in {
val timeoutLatch = TestLatch()
val timeoutActor = system.actorOf(Props(new Actor {

View file

@ -152,7 +152,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
ticks.get should ===(1)
}
"be canceled if cancel is performed before execution" in {
"be canceled if cancel is performed before execution" taggedAs TimingTest in {
val task = collectCancellable(system.scheduler.scheduleOnce(10 seconds)(()))
task.cancel() should ===(true)
task.isCancelled should ===(true)
@ -160,7 +160,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
task.isCancelled should ===(true)
}
"not be canceled if cancel is performed after execution" in {
"not be canceled if cancel is performed after execution" taggedAs TimingTest in {
val latch = TestLatch(1)
val task = collectCancellable(system.scheduler.scheduleOnce(10 millis)(latch.countDown()))
Await.ready(latch, remainingOrDefault)
@ -312,7 +312,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
"A LightArrayRevolverScheduler" must {
"reject tasks scheduled too far into the future" in {
"reject tasks scheduled too far into the future" taggedAs TimingTest in {
val maxDelay = tickDuration * Int.MaxValue
import system.dispatcher
system.scheduler.scheduleOnce(maxDelay, testActor, "OK")
@ -321,7 +321,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"reject periodic tasks scheduled too far into the future" in {
"reject periodic tasks scheduled too far into the future" taggedAs TimingTest in {
val maxDelay = tickDuration * Int.MaxValue
import system.dispatcher
system.scheduler.schedule(maxDelay, 1.second, testActor, "OK")
@ -330,7 +330,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"reject periodic tasks scheduled with too long interval" in {
"reject periodic tasks scheduled with too long interval" taggedAs TimingTest in {
val maxDelay = tickDuration * Int.MaxValue
import system.dispatcher
system.scheduler.schedule(100.millis, maxDelay, testActor, "OK")
@ -373,7 +373,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
expectNoMsg(1.second)
}
"survive vicious enqueueing" in {
"survive vicious enqueueing" taggedAs TimingTest in {
withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver)
import driver._
import system.dispatcher
@ -396,7 +396,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"execute multiple jobs at once when expiring multiple buckets" in {
"execute multiple jobs at once when expiring multiple buckets" taggedAs TimingTest in {
withScheduler() { (sched, driver)
implicit def ec = localEC
import driver._
@ -411,7 +411,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"properly defer jobs even when the timer thread oversleeps" in {
"properly defer jobs even when the timer thread oversleeps" taggedAs TimingTest in {
withScheduler() { (sched, driver)
implicit def ec = localEC
import driver._
@ -426,7 +426,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"correctly wrap around wheel rounds" in {
"correctly wrap around wheel rounds" taggedAs TimingTest in {
withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver)
implicit def ec = localEC
import driver._
@ -453,7 +453,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"correctly execute jobs when clock wraps around" in {
"correctly execute jobs when clock wraps around" taggedAs TimingTest in {
withScheduler(Long.MaxValue - 200000000L) { (sched, driver)
implicit def ec = localEC
import driver._
@ -480,7 +480,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"correctly wrap around ticks" in {
"correctly wrap around ticks" taggedAs TimingTest in {
val numEvents = 40
val targetTicks = Int.MaxValue - numEvents + 20
@ -507,7 +507,7 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"reliably reject jobs when shutting down" in {
"reliably reject jobs when shutting down" taggedAs TimingTest in {
withScheduler() { (sched, driver)
import system.dispatcher
val counter = new AtomicInteger

View file

@ -24,6 +24,7 @@ import akka.event.Logging
import java.util.concurrent.atomic.AtomicInteger
import java.lang.System.identityHashCode
import akka.util.Helpers.ConfigOps
import akka.testkit.LongRunningTest
object SupervisorHierarchySpec {
import FSM.`→`
@ -728,7 +729,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
"A Supervisor Hierarchy" must {
"restart manager and workers in AllForOne" in {
"restart manager and workers in AllForOne" taggedAs LongRunningTest in {
val countDown = new CountDownLatch(4)
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Exception])))))
@ -749,7 +750,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
}
}
"send notification to supervisor when permanent failure" in {
"send notification to supervisor when permanent failure" taggedAs LongRunningTest in {
val countDownMessages = new CountDownLatch(1)
val countDownMax = new CountDownLatch(1)
val boss = system.actorOf(Props(new Actor {
@ -773,7 +774,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
}
}
"resume children after Resume" in {
"resume children after Resume" taggedAs LongRunningTest in {
val boss = system.actorOf(Props[Resumer], "resumer")
boss ! "spawn"
val middle = expectMsgType[ActorRef]
@ -790,7 +791,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
expectMsg("pong")
}
"suspend children while failing" in {
"suspend children while failing" taggedAs LongRunningTest in {
val latch = TestLatch()
val slowResumer = system.actorOf(Props(new Actor {
override def supervisorStrategy = OneForOneStrategy() { case _ Await.ready(latch, 4.seconds.dilated); SupervisorStrategy.Resume }
@ -816,7 +817,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
expectMsg("pong")
}
"handle failure in creation when supervision startegy returns Resume and Restart" in {
"handle failure in creation when supervision startegy returns Resume and Restart" taggedAs LongRunningTest in {
val createAttempt = new AtomicInteger(0)
val preStartCalled = new AtomicInteger(0)
val postRestartCalled = new AtomicInteger(0)
@ -866,7 +867,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
postRestartCalled.get should ===(0)
}
"survive being stressed" in {
"survive being stressed" taggedAs LongRunningTest in {
system.eventStream.publish(Mute(
EventFilter[Failure](),
EventFilter.warning("Failure"),

View file

@ -80,7 +80,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
}
"A cluster router with a consistent hashing pool" must {
"start cluster with 2 nodes" taggedAs LongRunningTest in {
"start cluster with 2 nodes" in {
awaitClusterUp(first, second)
enterBarrier("after-1")
}
@ -105,7 +105,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
enterBarrier("after-2")
}
"deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in {
"deploy routees to new member nodes in the cluster" in {
awaitClusterUp(first, second, third)
@ -119,7 +119,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
enterBarrier("after-3")
}
"deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
"deploy programatically defined routees to the member nodes in the cluster" in {
runOn(first) {
val router2 = system.actorOf(
ClusterRouterPool(
@ -136,7 +136,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
enterBarrier("after-4")
}
"handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in {
"handle combination of configured router and programatically defined hashMapping" in {
runOn(first) {
def hashMapping: ConsistentHashMapping = {
case s: String s
@ -150,7 +150,7 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
enterBarrier("after-5")
}
"handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in {
"handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" in {
runOn(first) {
def hashMapping: ConsistentHashMapping = {
case s: String s

View file

@ -13,6 +13,7 @@ import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.remote.RARP
import akka.testkit.AkkaSpec
import akka.testkit.TimingTest
object AutoDownSpec {
final case class DownCalled(address: Address)
@ -92,7 +93,7 @@ class AutoDownSpec extends AkkaSpec("akka.actor.provider=remote") {
expectMsg(DownCalled(memberC.address))
}
"not down unreachable when losing leadership inbetween detection and specified duration" in {
"not down unreachable when losing leadership inbetween detection and specified duration" taggedAs TimingTest in {
val a = autoDownActor(2.seconds)
a ! LeaderChanged(Some(memberA.address))
a ! UnreachableMember(memberC)
@ -100,7 +101,7 @@ class AutoDownSpec extends AkkaSpec("akka.actor.provider=remote") {
expectNoMsg(3.second)
}
"not down when unreachable become reachable inbetween detection and specified duration" in {
"not down when unreachable become reachable inbetween detection and specified duration" taggedAs TimingTest in {
val a = autoDownActor(2.seconds)
a ! LeaderChanged(Some(memberA.address))
a ! UnreachableMember(memberB)
@ -108,7 +109,7 @@ class AutoDownSpec extends AkkaSpec("akka.actor.provider=remote") {
expectNoMsg(3.second)
}
"not down when unreachable is removed inbetween detection and specified duration" in {
"not down when unreachable is removed inbetween detection and specified duration" taggedAs TimingTest in {
val a = autoDownActor(2.seconds)
a ! LeaderChanged(Some(memberA.address))
a ! UnreachableMember(memberB)

View file

@ -10,7 +10,8 @@ import akka.actor.Address
class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers {
val nodesSize = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.nodesSize").getOrElse("250").toInt
val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("10000").toInt
// increase for serious measurements
val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("1000").toInt
def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = {
val nodes = (1 to size).map(n UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n))

View file

@ -10,7 +10,8 @@ import akka.actor.Address
class ReachabilityPerfSpec extends WordSpec with Matchers {
val nodesSize = sys.props.get("akka.cluster.ReachabilityPerfSpec.nodesSize").getOrElse("250").toInt
val iterations = sys.props.get("akka.cluster.ReachabilityPerfSpec.iterations").getOrElse("10000").toInt
// increase for serious measurements
val iterations = sys.props.get("akka.cluster.ReachabilityPerfSpec.iterations").getOrElse("100").toInt
val address = Address("akka.tcp", "sys", "a", 2552)
val node = Address("akka.tcp", "sys", "a", 2552)

View file

@ -32,7 +32,8 @@ class VectorClockPerfSpec extends WordSpec with Matchers {
import VectorClockPerfSpec._
val clockSize = sys.props.get("akka.cluster.VectorClockPerfSpec.clockSize").getOrElse("1000").toInt
val iterations = sys.props.get("akka.cluster.VectorClockPerfSpec.iterations").getOrElse("10000").toInt
// increase for serious measurements
val iterations = sys.props.get("akka.cluster.VectorClockPerfSpec.iterations").getOrElse("1000").toInt
val (vcBefore, nodes) = createVectorClockOfSize(clockSize)
val firstNode = nodes.head

View file

@ -62,7 +62,7 @@ object PersistenceDocSpec {
trait MyPersistentActor5 extends PersistentActor {
//#recovery-no-snap
override def recovery =
override def recovery =
Recovery(fromSnapshot = SnapshotSelectionCriteria.None)
//#recovery-no-snap
}

View file

@ -11,11 +11,11 @@ import akka.actor._
import akka.testkit._
object PerformanceSpec {
// multiply cycles with 200 for more
// accurate throughput measurements
val config =
"""
akka.persistence.performance.cycles.load = 1000
akka.persistence.performance.cycles.load = 100
# more accurate throughput measurements
#akka.persistence.performance.cycles.load = 200000
"""
case object StopMeasure

View file

@ -161,7 +161,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectNoMsg(2 seconds)
}
"generate AddressTerminated when missing heartbeats" in {
"generate AddressTerminated when missing heartbeats" taggedAs LongRunningTest in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
@ -198,7 +198,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectNoMsg(2 seconds)
}
"generate AddressTerminated when missing first heartbeat" in {
"generate AddressTerminated when missing first heartbeat" taggedAs LongRunningTest in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
@ -234,7 +234,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectNoMsg(2 seconds)
}
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" taggedAs LongRunningTest in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])

View file

@ -161,7 +161,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectNoMsg(2 seconds)
}
"generate AddressTerminated when missing heartbeats" in {
"generate AddressTerminated when missing heartbeats" taggedAs LongRunningTest in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
@ -198,7 +198,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectNoMsg(2 seconds)
}
"generate AddressTerminated when missing first heartbeat" in {
"generate AddressTerminated when missing first heartbeat" taggedAs LongRunningTest in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
@ -234,7 +234,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectNoMsg(2 seconds)
}
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" taggedAs LongRunningTest in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])

View file

@ -177,7 +177,7 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String)
override def beforeTermination() {
system.eventStream.publish(TestEvent.Mute(
EventFilter.warning(source = "akka://AkkaProtocolStressTest/user/$a", start = "received dead letter"),
EventFilter.warning(source = s"akka://AkkaProtocolStressTest/user/$$a", start = "received dead letter"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
systemB.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),

View file

@ -6,6 +6,7 @@ package akka.stream.impl.fusing
import akka.stream.impl.ConstantFun
import akka.stream.Supervision
import akka.stream.testkit.StreamSpec
import akka.testkit.LongRunningTest
class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit {
import Supervision.stoppingDecider
@ -23,7 +24,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit {
"Interpreter" must {
"work with a massive chain of maps" in new OneBoundedSetup[Int](Vector.fill(chainLength)(map): _*) {
"work with a massive chain of maps" taggedAs LongRunningTest in new OneBoundedSetup[Int](Vector.fill(chainLength)(map): _*) {
lastEvents() should be(Set.empty)
val tstamp = System.nanoTime()
@ -45,7 +46,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit {
info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s")
}
"work with a massive chain of maps with early complete" in new OneBoundedSetup[Int](
"work with a massive chain of maps with early complete" taggedAs LongRunningTest in new OneBoundedSetup[Int](
Vector.fill(halfLength)(map) ++
Seq(takeHalfOfRepetition) ++
Vector.fill(halfLength)(map): _*) {

View file

@ -3,6 +3,7 @@
*/
package akka.stream.scaladsl
import scala.concurrent.duration._
import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.ActorMaterializer
import akka.stream.Attributes.inputBuffer
@ -100,7 +101,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
expectMsg(initMessage)
publisher.sendNext(1)
expectNoMsg()
expectNoMsg(200.millis)
fw ! TriggerAckMessage
expectMsg(1)
@ -150,7 +151,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
expectMsg(1)
fw ! TriggerAckMessage
expectNoMsg() // Ack received but buffer empty
expectNoMsg(200.millis) // Ack received but buffer empty
publisher.sendNext(2) // Buffer this value
fw ! TriggerAckMessage

View file

@ -14,6 +14,7 @@ import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.ThrottleMode
import akka.testkit.TimingTest
class FlowDelaySpec extends StreamSpec {
@ -21,13 +22,13 @@ class FlowDelaySpec extends StreamSpec {
"A Delay" must {
"deliver elements with some time shift" in {
"deliver elements with some time shift" taggedAs TimingTest in {
Await.result(
Source(1 to 10).delay(1.seconds).grouped(100).runWith(Sink.head),
1200.millis) should ===(1 to 10)
}
"add delay to initialDelay if exists upstream" in {
"add delay to initialDelay if exists upstream" taggedAs TimingTest in {
Source(1 to 10).initialDelay(1.second).delay(1.second).runWith(TestSink.probe[Int])
.request(10)
.expectNoMsg(1800.millis)
@ -127,7 +128,7 @@ class FlowDelaySpec extends StreamSpec {
pSub.sendError(new RuntimeException() with NoStackTrace)
}
"properly delay according to buffer size" in {
"properly delay according to buffer size" taggedAs TimingTest in {
import akka.pattern.pipe
import system.dispatcher

View file

@ -16,6 +16,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
import scala.util.control.NoStackTrace
import akka.testkit.LongRunningTest
class FlowFoldAsyncSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
@ -43,7 +44,7 @@ class FlowFoldAsyncSpec extends StreamSpec {
inputSource.runWith(foldSink).futureValue(timeout) should ===(expected)
}
"work when using Flow.foldAsync" in assertAllStagesStopped {
"work when using Flow.foldAsync" taggedAs LongRunningTest in assertAllStagesStopped {
val flowTimeout =
Timeout((flowDelayMS * input.size).milliseconds + 3.seconds)

View file

@ -11,6 +11,7 @@ import akka.stream.testkit._
import akka.stream.testkit.Utils._
import scala.concurrent.Await
import akka.testkit.TimingTest
class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
@ -20,7 +21,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
"A GroupedWithin" must {
"group elements within the duration" in assertAllStagesStopped {
"group elements within the duration" taggedAs TimingTest in assertAllStagesStopped {
val input = Iterator.from(1)
val p = TestPublisher.manualProbe[Int]()
val c = TestSubscriber.manualProbe[immutable.Seq[Int]]()
@ -47,7 +48,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
c.expectNoMsg(200.millis)
}
"deliver bufferd elements onComplete before the timeout" in {
"deliver bufferd elements onComplete before the timeout" taggedAs TimingTest in {
val c = TestSubscriber.manualProbe[immutable.Seq[Int]]()
Source(1 to 3).groupedWithin(1000, 10.second).to(Sink.fromSubscriber(c)).run()
val cSub = c.expectSubscription
@ -57,7 +58,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
c.expectNoMsg(200.millis)
}
"buffer groups until requested from downstream" in {
"buffer groups until requested from downstream" taggedAs TimingTest in {
val input = Iterator.from(1)
val p = TestPublisher.manualProbe[Int]()
val c = TestSubscriber.manualProbe[immutable.Seq[Int]]()
@ -78,7 +79,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
c.expectNoMsg(100.millis)
}
"drop empty groups" in {
"drop empty groups" taggedAs TimingTest in {
val p = TestPublisher.manualProbe[Int]()
val c = TestSubscriber.manualProbe[immutable.Seq[Int]]()
Source.fromPublisher(p).groupedWithin(1000, 500.millis).to(Sink.fromSubscriber(c)).run()
@ -99,7 +100,7 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
c.expectNoMsg(100.millis)
}
"reset time window when max elements reached" in {
"reset time window when max elements reached" taggedAs TimingTest in {
val inputs = Iterator.from(1)
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[immutable.Seq[Int]]()
@ -124,18 +125,18 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
downstream.expectNoMsg(100.millis)
}
"group evenly" in {
"group evenly" taggedAs TimingTest in {
def script = Script(TestConfig.RandomTestRange map { _ val x, y, z = random.nextInt(); Seq(x, y, z) Seq(immutable.Seq(x, y, z)) }: _*)
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.groupedWithin(3, 10.minutes)))
}
"group with rest" in {
"group with rest" taggedAs TimingTest in {
def script = Script((TestConfig.RandomTestRange.map { _ val x, y, z = random.nextInt(); Seq(x, y, z) Seq(immutable.Seq(x, y, z)) }
:+ { val x = random.nextInt(); Seq(x) Seq(immutable.Seq(x)) }): _*)
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.groupedWithin(3, 10.minutes)))
}
"group with small groups with backpressure" in {
"group with small groups with backpressure" taggedAs TimingTest in {
Source(1 to 10)
.groupedWithin(1, 1.day)
.throttle(1, 110.millis, 0, ThrottleMode.Shaping)

View file

@ -3,6 +3,7 @@
*/
package akka.stream.scaladsl
import scala.concurrent.duration._
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
@ -102,7 +103,7 @@ class FlowOrElseSpec extends AkkaSpec {
"complete when both inputs completes without emitting elements, regardless of order" in new OrElseProbedFlow {
outProbe.ensureSubscription()
inProbe2.sendComplete()
outProbe.expectNoMsg() // make sure it did not complete here
outProbe.expectNoMsg(200.millis) // make sure it did not complete here
inProbe1.sendComplete()
outProbe.expectComplete()
}

View file

@ -16,6 +16,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
import scala.collection.immutable
import scala.concurrent.duration._
import scala.util.Random
import akka.testkit.LongRunningTest
class FramingSpec extends StreamSpec {
@ -70,6 +71,8 @@ class FramingSpec extends StreamSpec {
val rechunk = Flow[ByteString].via(new Rechunker).named("rechunker")
override def expectedTestDuration = 2.minutes
"Delimiter bytes based framing" must {
val delimiterBytes = List("\n", "\r\n", "FOO").map(ByteString(_))
@ -85,7 +88,7 @@ class FramingSpec extends StreamSpec {
yield delimiter.take(prefix) ++ s
"work with various delimiters and test sequences" in {
for (delimiter delimiterBytes; _ 1 to 100) {
for (delimiter delimiterBytes; _ 1 to 5) {
val testSequence = completeTestSequences(delimiter)
val f = Source(testSequence)
.map(_ ++ delimiter)
@ -161,9 +164,8 @@ class FramingSpec extends StreamSpec {
ByteString(Array.ofDim[Byte](fieldOffset)) ++ header ++ payload
}
"work with various byte orders, frame lengths and offsets" in {
"work with various byte orders, frame lengths and offsets" taggedAs LongRunningTest in {
for {
_ (1 to 10)
byteOrder byteOrders
fieldOffset fieldOffsets
fieldLength fieldLengths
@ -229,7 +231,7 @@ class FramingSpec extends StreamSpec {
.failed.futureValue shouldBe a[FramingException]
}
"report truncated frames" in {
"report truncated frames" taggedAs LongRunningTest in {
for {
//_ 1 to 10
byteOrder byteOrders

View file

@ -153,7 +153,7 @@ class QueueSinkSpec extends StreamSpec {
expectMsg(Some(1))
queue.pull().pipeTo(testActor)
expectNoMsg() // element requested but buffer empty
expectNoMsg(200.millis) // element requested but buffer empty
sub.sendNext(2)
expectMsg(Some(2))

View file

@ -14,12 +14,18 @@ import scala.concurrent._
import akka.Done
import akka.stream.testkit.{ StreamSpec, TestSubscriber, TestSourceStage, GraphStageMessages }
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.time.Span
class QueueSourceSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val pause = 300.millis
// more frequent checks than defaults from AkkaSpec
implicit val testPatience = PatienceConfig(
testKitSettings.DefaultTimeout.duration,
Span(5, org.scalatest.time.Millis))
def assertSuccess(f: Future[QueueOfferResult]): Unit = {
f.futureValue should ===(QueueOfferResult.Enqueued)
}

View file

@ -7,13 +7,14 @@ import scala.concurrent.duration._
import akka.stream.{ ClosedShape, ActorMaterializer }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.testkit.TimingTest
class TickSourceSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
"A Flow based on tick publisher" must {
"produce ticks" in assertAllStagesStopped {
"produce ticks" taggedAs TimingTest in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[String]()
Source.tick(1.second, 1.second, "tick").to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()
@ -25,7 +26,7 @@ class TickSourceSpec extends StreamSpec {
c.expectNoMsg(200.millis)
}
"drop ticks when not requested" in {
"drop ticks when not requested" taggedAs TimingTest in {
val c = TestSubscriber.manualProbe[String]()
Source.tick(1.second, 1.second, "tick").to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()
@ -42,7 +43,7 @@ class TickSourceSpec extends StreamSpec {
c.expectNoMsg(200.millis)
}
"reject multiple subscribers, but keep the first" in {
"reject multiple subscribers, but keep the first" taggedAs TimingTest in {
val p = Source.tick(1.second, 1.second, "tick").runWith(Sink.asPublisher(false))
val c1 = TestSubscriber.manualProbe[String]()
val c2 = TestSubscriber.manualProbe[String]()
@ -58,7 +59,7 @@ class TickSourceSpec extends StreamSpec {
sub1.cancel()
}
"be usable with zip for a simple form of rate limiting" in {
"be usable with zip for a simple form of rate limiting" taggedAs TimingTest in {
val c = TestSubscriber.manualProbe[Int]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
@ -79,7 +80,7 @@ class TickSourceSpec extends StreamSpec {
sub.cancel()
}
"be possible to cancel" in assertAllStagesStopped {
"be possible to cancel" taggedAs TimingTest in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[String]()
val tickSource = Source.tick(1.second, 1.second, "tick")
val cancellable = tickSource.to(Sink.fromSubscriber(c)).run()
@ -95,7 +96,7 @@ class TickSourceSpec extends StreamSpec {
c.expectComplete()
}
"acknowledge cancellation only once" in assertAllStagesStopped {
"acknowledge cancellation only once" taggedAs TimingTest in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[String]()
val cancellable = Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()
@ -106,7 +107,7 @@ class TickSourceSpec extends StreamSpec {
c.expectComplete()
}
"have isCancelled mirror the cancellation state" in assertAllStagesStopped {
"have isCancelled mirror the cancellation state" taggedAs TimingTest in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[String]()
val cancellable = Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()

View file

@ -227,7 +227,7 @@ object Source {
* `create` factory is never called and the materialized `CompletionStage` is failed.
*/
def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] =
scaladsl.Source.lazily[T, M](() => create.create().asScala).mapMaterializedValue(_.toJava).asJava
scaladsl.Source.lazily[T, M](() create.create().asScala).mapMaterializedValue(_.toJava).asJava
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]

View file

@ -360,7 +360,7 @@ object Source {
* the materialized future is completed with its value, if downstream cancels or fails without any demand the
* create factory is never called and the materialized `Future` is failed.
*/
def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] =
def lazily[T, M](create: () Source[T, M]): Source[T, Future[M]] =
Source.fromGraph(new LazySource[T, M](create))
/**

View file

@ -83,13 +83,16 @@ private[akka] trait MetricsKit extends MetricsKitOps {
clearMetrics()
}
def reportMetricsEnabled: Boolean = true
/**
* Causes immediate flush of metrics, using all registered reporters.
*
* HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting.
*/
def reportMetrics() {
reporters foreach { _.report() }
if (reportMetricsEnabled)
reporters foreach { _.report() }
}
/**

View file

@ -54,10 +54,12 @@ private[akka] trait MetricsKitOps extends MetricKeyDSL {
registry.histogram((key / "histogram").toString)
}
def forceGcEnabled: Boolean = true
/** Yet another delegate to `System.gc()` */
def gc() {
// todo add some form of logging, to differentiate manual gc calls from "normal" ones
System.gc()
if (forceGcEnabled)
System.gc()
}
/**

View file

@ -12,6 +12,11 @@ import sbt._
import sbt.Keys._
object MultiNode extends AutoPlugin {
// MultiJvm tests can be excluded from normal test target an validatePullRequest
// with -Dakka.test.multi-in-test=false
val multiNodeTestInTest: Boolean =
System.getProperty("akka.test.multi-in-test", "true") == "true"
object CliOptions {
val multiNode = CliOption("akka.test.multi-node", false)
@ -60,19 +65,21 @@ object MultiNode extends AutoPlugin {
CliOptions.hostsFileName.map(multiNodeHostsFileName in MultiJvm := _) ++
CliOptions.javaName.map(multiNodeJavaName in MultiJvm := _) ++
CliOptions.targetDirName.map(multiNodeTargetDirName in MultiJvm := _) ++
// make sure that MultiJvm tests are executed by the default test target,
// and combine the results from ordinary test and multi-jvm tests
(executeTests in Test <<= (executeTests in Test, multiExecuteTests) map {
case (testResults, multiNodeResults) =>
val overall =
if (testResults.overall.id < multiNodeResults.overall.id)
multiNodeResults.overall
else
testResults.overall
Tests.Output(overall,
testResults.events ++ multiNodeResults.events,
testResults.summaries ++ multiNodeResults.summaries)
})
(if (multiNodeTestInTest) {
// make sure that MultiJvm tests are executed by the default test target,
// and combine the results from ordinary test and multi-jvm tests
(executeTests in Test <<= (executeTests in Test, multiExecuteTests) map {
case (testResults, multiNodeResults) =>
val overall =
if (testResults.overall.id < multiNodeResults.overall.id)
multiNodeResults.overall
else
testResults.overall
Tests.Output(overall,
testResults.events ++ multiNodeResults.events,
testResults.summaries ++ multiNodeResults.summaries)
})
} else Nil)
}
/**

View file

@ -264,9 +264,9 @@ object MultiNodeWithPrValidation extends AutoPlugin {
override def trigger = allRequirements
override def requires = ValidatePullRequest && MultiNode
override lazy val projectSettings = Seq(
additionalTasks in ValidatePR += MultiNode.multiTest
)
override lazy val projectSettings =
if (MultiNode.multiNodeTestInTest) Seq(additionalTasks in ValidatePR += MultiNode.multiTest)
else Nil
}
/**