diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index f60a3ac36e..6ccdb2e620 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -4,14 +4,18 @@ package akka.event +import akka.Done + import scala.concurrent.duration._ - import com.typesafe.config.ConfigFactory -import language.postfixOps +import language.postfixOps import akka.actor._ import akka.testkit.{ AkkaSpec, GHExcludeTest, TestProbe } +import scala.concurrent.{ ExecutionContext, Promise } +import scala.util.Try + object EventStreamSpec { val config = ConfigFactory.parseString(""" @@ -71,6 +75,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { import EventStreamSpec._ val impl = system.asInstanceOf[ActorSystemImpl] + implicit val ec: ExecutionContext = system.dispatcher "An EventStream" must { @@ -337,13 +342,26 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { es.subscribe(target, classOf[A]) should ===(true) fishForDebugMessage(a2, s"unsubscribing $target from all channels") - es.subscribe(target, classOf[A]) should ===(true) - fishForDebugMessage(a2, s"unsubscribing $target from all channels") + after(30.millis) { + es.subscribe(target, classOf[A]) should ===(true) + fishForDebugMessage(a2, s"unsubscribing $target from all channels") + } } finally { shutdown(sys) } } + def after(delay: FiniteDuration)(block: => Unit): Unit = { + val done = Promise[Done]() + system.scheduler.scheduleOnce(delay) { + done.complete(Try { + block + Done + }) + } + done.future.futureValue + } + "not allow initializing a TerminatedUnsubscriber twice" in { val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config) // initializes an TerminatedUnsubscriber during start