Add overridden duration timeout to StreamTestKit (#1468)

* Add overridden duration timeout to StreamTestKit

* add license to file

* Update pr-1468-override-timeout-stream-testkit.backwards.excludes

---------

Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>
This commit is contained in:
Matthew de Detrich 2025-01-25 19:46:42 +01:00 committed by GitHub
parent ba639f68a2
commit f5d49d31c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 111 additions and 7 deletions

View file

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# StreamTestKit.assertNoChildren is an internal API and can be changed
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.testkit.scaladsl.StreamTestKit.assertNoChildren")

View file

@ -19,6 +19,10 @@ import pekko.stream.{ Materializer, SystemMaterializer }
import pekko.stream.impl.PhasedFusingActorMaterializer
import pekko.stream.testkit.scaladsl
import java.time.Duration
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
object StreamTestKit {
/**
@ -29,7 +33,21 @@ object StreamTestKit {
def assertAllStagesStopped(mat: Materializer): Unit =
mat match {
case impl: PhasedFusingActorMaterializer =>
scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor)
scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor, None)
case _ =>
}
/**
* Assert that there are no stages running under a given materializer.
* Usually this assertion is run after a test-case to check that all of the
* stages have terminated successfully with an overridden duration that ignores
* `stream.testkit.all-stages-stopped-timeout`.
*/
def assertAllStagesStopped(mat: Materializer, overrideTimeout: Duration): Unit =
mat match {
case impl: PhasedFusingActorMaterializer =>
scaladsl.StreamTestKit.assertNoChildren(impl.system, impl.supervisor,
Some(FiniteDuration(overrideTimeout.toMillis, TimeUnit.MILLISECONDS)))
case _ =>
}

View file

@ -35,12 +35,30 @@ object StreamTestKit {
* This assertion is useful to check that all of the stages have
* terminated successfully.
*/
def assertAllStagesStopped[T](block: => T, overrideTimeout: FiniteDuration)(implicit materializer: Materializer): T =
materializer match {
case impl: PhasedFusingActorMaterializer =>
stopAllChildren(impl.system, impl.supervisor)
val result = block
assertNoChildren(impl.system, impl.supervisor, Some(overrideTimeout))
result
case _ => block
}
/**
* Asserts that after the given code block is ran, no stages are left over
* that were created by the given materializer with an overridden duration
* that ignores `stream.testkit.all-stages-stopped-timeout`.
*
* This assertion is useful to check that all of the stages have
* terminated successfully.
*/
def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T =
materializer match {
case impl: PhasedFusingActorMaterializer =>
stopAllChildren(impl.system, impl.supervisor)
val result = block
assertNoChildren(impl.system, impl.supervisor)
assertNoChildren(impl.system, impl.supervisor, None)
result
case _ => block
}
@ -53,10 +71,15 @@ object StreamTestKit {
}
/** INTERNAL API */
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = {
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef,
overrideTimeout: Option[FiniteDuration]): Unit = {
val probe = TestProbe()(sys)
val c = sys.settings.config.getConfig("pekko.stream.testkit")
val timeout = c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis
val timeout = overrideTimeout.getOrElse {
val c = sys.settings.config.getConfig("pekko.stream.testkit")
c.getDuration("all-stages-stopped-timeout", MILLISECONDS).millis
}
probe.within(timeout) {
try probe.awaitAssert {
supervisor.tell(StreamSupervisor.GetChildren, probe.ref)

View file

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.stream.testkit
import org.apache.pekko.testkit.TestKitBase
import org.scalatest.time.{ Millis, Span }
import java.util.concurrent.TimeUnit
trait StreamConfiguration extends TestKitBase {
case class StreamConfig(allStagesStoppedTimeout: Span = Span({
val c = system.settings.config.getConfig("pekko.stream.testkit")
c.getDuration("all-stages-stopped-timeout", TimeUnit.MILLISECONDS)
}, Millis))
private val defaultStreamConfig = StreamConfig()
/**
* The default `StreamConfig` which is derived from the Actor System's `pekko.stream.testkit.all-stages-stopped-timeout`
* configuration value. If you want to provide a different StreamConfig for specific tests without having to re-specify
* `pekko.stream.testkit.all-stages-stopped-timeout` then you can override this value.
*/
implicit def streamConfig: StreamConfig = defaultStreamConfig
}

View file

@ -30,7 +30,10 @@ import org.scalatest.Failed
import com.typesafe.config.{ Config, ConfigFactory }
abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) {
import java.util.concurrent.TimeUnit
abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) with StreamConfiguration {
def this(config: Config) =
this(
ActorSystem(
@ -73,7 +76,8 @@ abstract class StreamSpec(_system: ActorSystem) extends PekkoSpec(_system) {
case impl: PhasedFusingActorMaterializer =>
stopAllChildren(impl.system, impl.supervisor)
val result = test.apply()
assertNoChildren(impl.system, impl.supervisor)
assertNoChildren(impl.system, impl.supervisor,
Some(FiniteDuration(streamConfig.allStagesStoppedTimeout.millisPart, TimeUnit.MILLISECONDS)))
result
case _ => other
}