diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala index 6170fc5782..bc764fa88f 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala @@ -17,12 +17,12 @@ import akka.actor.typed.{ ExtensionId, Logger, Props, - Settings, - Terminated + Settings } import akka.annotation.InternalApi import akka.util.Timeout import akka.{ actor => untyped } +import akka.Done import com.typesafe.config.ConfigFactory import scala.compat.java8.FutureConverters import scala.concurrent._ @@ -73,13 +73,10 @@ import akka.actor.typed.internal.InternalRecipientRef override def scheduler: untyped.Scheduler = throw new UnsupportedOperationException("no scheduler") - private val terminationPromise = Promise[Terminated] - override def terminate(): Future[akka.actor.typed.Terminated] = { - terminationPromise.trySuccess(Terminated(this)) - terminationPromise.future - } - override def whenTerminated: Future[akka.actor.typed.Terminated] = terminationPromise.future - override def getWhenTerminated: CompletionStage[Terminated] = FutureConverters.toJava(whenTerminated) + private val terminationPromise = Promise[Done] + override def terminate(): Unit = terminationPromise.trySuccess(Done) + override def whenTerminated: Future[Done] = terminationPromise.future + override def getWhenTerminated: CompletionStage[Done] = FutureConverters.toJava(whenTerminated) override val startTime: Long = System.currentTimeMillis() override def uptime: Long = System.currentTimeMillis() - startTime override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory { diff --git a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala index f146135511..63e7c0902e 100644 --- a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala +++ b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala @@ -4,8 +4,9 @@ package akka.actor.testkit.typed.scaladsl -import scala.concurrent.Promise +import akka.Done +import scala.concurrent.Promise import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.Behaviors import org.scalatest.BeforeAndAfterAll @@ -60,7 +61,7 @@ class ActorTestKitSpec extends ScalaTestWithActorTestKit with WordSpecLike { // usually done in test framework hook method but we want to assert val testkit2 = ActorTestKit() testkit2.shutdownTestKit() - testkit2.system.whenTerminated.futureValue shouldBe a[Terminated] + testkit2.system.whenTerminated.futureValue shouldBe a[Done] } } diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java index 0c07613ba6..d1219d8fbf 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java @@ -5,6 +5,7 @@ /** Copyright (C) 2009-2018 Lightbend Inc. */ package akka.actor.typed; +import akka.Done; import org.junit.Test; import org.scalatest.junit.JUnitSuite; @@ -17,15 +18,16 @@ public class ActorSystemTest extends JUnitSuite { @Test public void testGetWhenTerminated() throws Exception { - final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem"); + final ActorSystem system = + ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem"); system.terminate(); - final CompletionStage cs = system.getWhenTerminated(); + final CompletionStage cs = system.getWhenTerminated(); cs.toCompletableFuture().get(2, SECONDS); } @Test public void testGetWhenTerminatedWithoutTermination() { - final ActorSystem system = + final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedWithoutTermination"); assertFalse(system.getWhenTerminated().toCompletableFuture().isDone()); } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala index 239fd63808..aae630cb7c 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala @@ -264,6 +264,9 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with WordSpecLike { } try f(sys) - finally sys.terminate().futureValue + finally { + sys.terminate() + sys.whenTerminated.futureValue + } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala index 07362754fb..89897fd5a5 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala @@ -27,11 +27,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with case class Probe(message: String, replyTo: ActorRef[String]) def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)( - block: ActorSystem[T] => Unit): Terminated = { + block: ActorSystem[T] ⇒ Unit): Unit = { val sys = system(behavior, s"$suite-$name") try { block(sys) - if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue + if (doTerminate) { + sys.terminate() + sys.whenTerminated.futureValue + } } catch { case NonFatal(ex) => sys.terminate() @@ -41,20 +44,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with "An ActorSystem" must { "start the guardian actor and terminate when it terminates" in { - val t = withSystem("a", Behaviors.receiveMessage[Probe] { p => + withSystem("a", Behaviors.receiveMessage[Probe] { p => p.replyTo ! p.message Behaviors.stopped - }, doTerminate = false) { sys => + }, doTerminate = false) { sys ⇒ val inbox = TestInbox[String]("a") sys ! Probe("hello", inbox.ref) eventually { inbox.hasMessages should ===(true) } inbox.receiveAll() should ===("hello" :: Nil) + sys.whenTerminated.futureValue } - val p = t.ref.path - p.name should ===("/") - p.address.system should ===(suite + "-a") } // see issue #24172 @@ -86,7 +87,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with inbox.receiveAll() should ===("started" :: Nil) // now we know that the guardian has started, and should receive PostStop - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue inbox.receiveAll() should ===("done" :: Nil) } @@ -97,7 +99,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with // for this case the guardian might not have been started before // the system terminates and then it will not receive PostStop, which // is OK since it wasn't really started yet - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue } "log to the event stream" in { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala index 771b3d2661..6e8819b941 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala @@ -44,7 +44,8 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik printer ! PrintMe("not message 2") // #fire-and-forget-doit - system.terminate().futureValue + system.terminate() + system.whenTerminated.futureValue } "contain a sample for request response" in { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 3ac6d4a2b5..3946027c1c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -4,12 +4,13 @@ package akka.actor.typed -import akka.{ actor => untyped } import java.util.concurrent.CompletionStage import java.util.concurrent.ThreadFactory import scala.concurrent.ExecutionContextExecutor import scala.concurrent.Future +import akka.Done +import akka.{ actor => untyped } import akka.actor.BootstrapSetup import akka.actor.setup.ActorSystemSetup import akka.actor.typed.internal.InternalRecipientRef @@ -106,20 +107,25 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter * Terminates this actor system. This will stop the guardian actor, which in turn * will recursively stop all its child actors, then the system guardian * (below which the logging actors reside). + * + * This is an asynchronous operation and completion of the termination can + * be observed with [[ActorSystem.whenTerminated]] or [[ActorSystem.getWhenTerminated]]. */ - def terminate(): Future[Terminated] + def terminate(): Unit /** - * Returns a Future which will be completed after the ActorSystem has been terminated - * and termination hooks have been executed. + * Scala API: Returns a Future which will be completed after the ActorSystem has been terminated + * and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]] + * or by stopping the guardian actor. */ - def whenTerminated: Future[Terminated] + def whenTerminated: Future[Done] /** - * Returns a CompletionStage which will be completed after the ActorSystem has been terminated - * and termination hooks have been executed. + * Java API: Returns a CompletionStage which will be completed after the ActorSystem has been terminated + * and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]] + * or by stopping the guardian actor. */ - def getWhenTerminated: CompletionStage[Terminated] + def getWhenTerminated: CompletionStage[Done] /** * The deadLetter address is a destination that will accept (and discard) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala index 798882cf76..9514f515a9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -9,6 +9,7 @@ package adapter import java.util.concurrent.CompletionStage import akka.actor +import akka.Done import akka.actor.ExtendedActorSystem import akka.actor.InvalidMessageException import akka.{ actor => untyped } @@ -91,11 +92,10 @@ import akka.event.LoggingFilterWithMarker import akka.dispatch.ExecutionContexts.sameThreadExecutionContext - override def terminate(): scala.concurrent.Future[akka.actor.typed.Terminated] = - untypedSystem.terminate().map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext) - override lazy val whenTerminated: scala.concurrent.Future[akka.actor.typed.Terminated] = - untypedSystem.whenTerminated.map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext) - override lazy val getWhenTerminated: CompletionStage[akka.actor.typed.Terminated] = + override def terminate(): Unit = untypedSystem.terminate() + override lazy val whenTerminated: scala.concurrent.Future[akka.Done] = + untypedSystem.whenTerminated.map(_ => Done)(sameThreadExecutionContext) + override lazy val getWhenTerminated: CompletionStage[akka.Done] = FutureConverters.toJava(whenTerminated) def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)( diff --git a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java index 88d0be86cf..1c113ddb71 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java @@ -11,8 +11,8 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; public class ClusterApiTest extends JUnitSuite { @@ -56,8 +56,10 @@ public class ClusterApiTest extends JUnitSuite { probe2.expectMessageClass(SelfRemoved.class); } finally { - // TODO no java API to terminate actor system - Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds")); + system1.terminate(); + system1.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); + system2.terminate(); + system2.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); } } } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala index e8445fbe85..3f4c4af153 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala @@ -38,11 +38,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with case class Probe(message: String, replyTo: ActorRef[String]) def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)( - block: ActorSystem[T] => Unit): Terminated = { + block: ActorSystem[T] => Unit): Unit = { val sys = system(behavior, s"$suite-$name") try { block(sys) - if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue + if (doTerminate) { + sys.terminate() + sys.whenTerminated.futureValue + } } catch { case NonFatal(ex) => sys.terminate() @@ -52,20 +55,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with "An ActorSystem" must { "start the guardian actor and terminate when it terminates" in { - val t = withSystem("a", Behaviors.receiveMessage[Probe] { p => + withSystem("a", Behaviors.receiveMessage[Probe] { p => p.replyTo ! p.message Behaviors.stopped - }, doTerminate = false) { sys => + }, doTerminate = false) { sys ⇒ val inbox = TestInbox[String]("a") sys ! Probe("hello", inbox.ref) eventually { inbox.hasMessages should ===(true) } inbox.receiveAll() should ===("hello" :: Nil) + sys.whenTerminated.futureValue } - val p = t.ref.path - p.name should ===("/") - p.address.system should ===(suite + "-a") } // see issue #24172 @@ -97,7 +98,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with inbox.receiveAll() should ===("started" :: Nil) // now we know that the guardian has started, and should receive PostStop - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue inbox.receiveAll() should ===("done" :: Nil) } @@ -108,7 +110,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with // for this case the guardian might not have been started before // the system terminates and then it will not receive PostStop, which // is OK since it wasn't really started yet - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue } "log to the event stream" in { diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 2a5828e592..a83e111092 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -185,7 +185,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { if (down) { // abrupt termination - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) clusterNode1.manager ! Down(clusterNode2.selfMember.address) } else { clusterNode1.manager ! Leave(clusterNode2.selfMember.address) @@ -295,7 +296,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { regProbe1.expectMessage(Pong) // abrupt termination - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) clusterNode1.manager ! Down(clusterNode2.selfMember.address) regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) @@ -337,7 +339,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { // abrupt termination but then a node with the same host:port comes online quickly system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) val testKit3 = ActorTestKit( system1.name, @@ -447,7 +450,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { // abrupt termination but then a node with the same host:port comes online quickly system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) val testKit3 = ActorTestKit( system1.name, diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala index f5bad04fbf..48897892db 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala @@ -71,8 +71,10 @@ class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually val cluster1 = Cluster(system1) val cluster2 = Cluster(system2) } finally { - system1.terminate().futureValue - system2.terminate().futureValue + system1.terminate() + system1.whenTerminated.futureValue + system2.terminate() + system2.whenTerminated.futureValue } } @@ -139,8 +141,10 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually cluster2.isTerminated shouldEqual true } } finally { - system.terminate().futureValue - system2.terminate().futureValue + system.terminate() + system.whenTerminated.futureValue + system2.terminate() + system2.whenTerminated.futureValue } } @@ -220,9 +224,12 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually system3.whenTerminated.futureValue } finally { - system1.terminate().futureValue - system2.terminate().futureValue - system3.terminate().futureValue + system1.terminate() + system1.whenTerminated.futureValue + system2.terminate() + system2.whenTerminated.futureValue + system3.terminate() + system3.whenTerminated.futureValue } } }