fix EventStreamSpec instability

fixes #30460
This commit is contained in:
Arnout Engelen 2021-08-04 16:58:16 +02:00
parent 60e8225059
commit 04bf969d04
No known key found for this signature in database
GPG key ID: 061107B0F74A6DAA

View file

@ -4,14 +4,18 @@
package akka.event
import akka.Done
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import language.postfixOps
import language.postfixOps
import akka.actor._
import akka.testkit.{ AkkaSpec, GHExcludeTest, TestProbe }
import scala.concurrent.{ ExecutionContext, Promise }
import scala.util.Try
object EventStreamSpec {
val config = ConfigFactory.parseString("""
@ -71,6 +75,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
import EventStreamSpec._
val impl = system.asInstanceOf[ActorSystemImpl]
implicit val ec: ExecutionContext = system.dispatcher
"An EventStream" must {
@ -337,13 +342,26 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
es.subscribe(target, classOf[A]) should ===(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
es.subscribe(target, classOf[A]) should ===(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
after(30.millis) {
es.subscribe(target, classOf[A]) should ===(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
}
} finally {
shutdown(sys)
}
}
def after(delay: FiniteDuration)(block: => Unit): Unit = {
val done = Promise[Done]()
system.scheduler.scheduleOnce(delay) {
done.complete(Try {
block
Done
})
}
done.future.futureValue
}
"not allow initializing a TerminatedUnsubscriber twice" in {
val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config)
// initializes an TerminatedUnsubscriber during start