From 7cc6266ad029d2f7ad7a61e5c695bb3fd3113e2b Mon Sep 17 00:00:00 2001 From: Tzu-Chiao Yeh Date: Thu, 20 Sep 2018 17:08:09 +0800 Subject: [PATCH] Change typed whenTerminated signature to Future[Done], #25647 This change ignore the terminated passed from untyped and map it into Done, with some minor changes for testing termination. termiate() returns Unit to not bias it towards the Scala API, completion can be observed with whenTerminated or getWhenTerminated --- .../typed/internal/ActorSystemStub.scala | 15 +++++-------- .../typed/scaladsl/ActorTestKitSpec.scala | 5 +++-- .../akka/actor/typed/ActorSystemTest.java | 8 ++++--- .../akka/actor/typed/ExtensionsSpec.scala | 5 ++++- .../typed/internal/ActorSystemSpec.scala | 21 ++++++++++-------- .../akka/typed/InteractionPatternsSpec.scala | 3 ++- .../scala/akka/actor/typed/ActorSystem.scala | 22 ++++++++++++------- .../internal/adapter/ActorSystemAdapter.scala | 10 ++++----- .../akka/cluster/typed/ClusterApiTest.java | 10 +++++---- .../akka/cluster/typed/ActorSystemSpec.scala | 21 ++++++++++-------- .../ClusterReceptionistSpec.scala | 12 ++++++---- .../typed/BasicClusterExampleSpec.scala | 21 ++++++++++++------ 12 files changed, 91 insertions(+), 62 deletions(-) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala index 6170fc5782..bc764fa88f 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/ActorSystemStub.scala @@ -17,12 +17,12 @@ import akka.actor.typed.{ ExtensionId, Logger, Props, - Settings, - Terminated + Settings } import akka.annotation.InternalApi import akka.util.Timeout import akka.{ actor => untyped } +import akka.Done import com.typesafe.config.ConfigFactory import scala.compat.java8.FutureConverters import scala.concurrent._ @@ -73,13 +73,10 @@ import akka.actor.typed.internal.InternalRecipientRef override def scheduler: untyped.Scheduler = throw new UnsupportedOperationException("no scheduler") - private val terminationPromise = Promise[Terminated] - override def terminate(): Future[akka.actor.typed.Terminated] = { - terminationPromise.trySuccess(Terminated(this)) - terminationPromise.future - } - override def whenTerminated: Future[akka.actor.typed.Terminated] = terminationPromise.future - override def getWhenTerminated: CompletionStage[Terminated] = FutureConverters.toJava(whenTerminated) + private val terminationPromise = Promise[Done] + override def terminate(): Unit = terminationPromise.trySuccess(Done) + override def whenTerminated: Future[Done] = terminationPromise.future + override def getWhenTerminated: CompletionStage[Done] = FutureConverters.toJava(whenTerminated) override val startTime: Long = System.currentTimeMillis() override def uptime: Long = System.currentTimeMillis() - startTime override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory { diff --git a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala index f146135511..63e7c0902e 100644 --- a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala +++ b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/ActorTestKitSpec.scala @@ -4,8 +4,9 @@ package akka.actor.testkit.typed.scaladsl -import scala.concurrent.Promise +import akka.Done +import scala.concurrent.Promise import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.Behaviors import org.scalatest.BeforeAndAfterAll @@ -60,7 +61,7 @@ class ActorTestKitSpec extends ScalaTestWithActorTestKit with WordSpecLike { // usually done in test framework hook method but we want to assert val testkit2 = ActorTestKit() testkit2.shutdownTestKit() - testkit2.system.whenTerminated.futureValue shouldBe a[Terminated] + testkit2.system.whenTerminated.futureValue shouldBe a[Done] } } diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java index 0c07613ba6..d1219d8fbf 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ActorSystemTest.java @@ -5,6 +5,7 @@ /** Copyright (C) 2009-2018 Lightbend Inc. */ package akka.actor.typed; +import akka.Done; import org.junit.Test; import org.scalatest.junit.JUnitSuite; @@ -17,15 +18,16 @@ public class ActorSystemTest extends JUnitSuite { @Test public void testGetWhenTerminated() throws Exception { - final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem"); + final ActorSystem system = + ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem"); system.terminate(); - final CompletionStage cs = system.getWhenTerminated(); + final CompletionStage cs = system.getWhenTerminated(); cs.toCompletableFuture().get(2, SECONDS); } @Test public void testGetWhenTerminatedWithoutTermination() { - final ActorSystem system = + final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedWithoutTermination"); assertFalse(system.getWhenTerminated().toCompletableFuture().isDone()); } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala index 239fd63808..aae630cb7c 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala @@ -264,6 +264,9 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with WordSpecLike { } try f(sys) - finally sys.terminate().futureValue + finally { + sys.terminate() + sys.whenTerminated.futureValue + } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala index 07362754fb..89897fd5a5 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala @@ -27,11 +27,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with case class Probe(message: String, replyTo: ActorRef[String]) def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)( - block: ActorSystem[T] => Unit): Terminated = { + block: ActorSystem[T] ⇒ Unit): Unit = { val sys = system(behavior, s"$suite-$name") try { block(sys) - if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue + if (doTerminate) { + sys.terminate() + sys.whenTerminated.futureValue + } } catch { case NonFatal(ex) => sys.terminate() @@ -41,20 +44,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with "An ActorSystem" must { "start the guardian actor and terminate when it terminates" in { - val t = withSystem("a", Behaviors.receiveMessage[Probe] { p => + withSystem("a", Behaviors.receiveMessage[Probe] { p => p.replyTo ! p.message Behaviors.stopped - }, doTerminate = false) { sys => + }, doTerminate = false) { sys ⇒ val inbox = TestInbox[String]("a") sys ! Probe("hello", inbox.ref) eventually { inbox.hasMessages should ===(true) } inbox.receiveAll() should ===("hello" :: Nil) + sys.whenTerminated.futureValue } - val p = t.ref.path - p.name should ===("/") - p.address.system should ===(suite + "-a") } // see issue #24172 @@ -86,7 +87,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with inbox.receiveAll() should ===("started" :: Nil) // now we know that the guardian has started, and should receive PostStop - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue inbox.receiveAll() should ===("done" :: Nil) } @@ -97,7 +99,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with // for this case the guardian might not have been started before // the system terminates and then it will not receive PostStop, which // is OK since it wasn't really started yet - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue } "log to the event stream" in { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala index 771b3d2661..6e8819b941 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala @@ -44,7 +44,8 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik printer ! PrintMe("not message 2") // #fire-and-forget-doit - system.terminate().futureValue + system.terminate() + system.whenTerminated.futureValue } "contain a sample for request response" in { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 3ac6d4a2b5..3946027c1c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -4,12 +4,13 @@ package akka.actor.typed -import akka.{ actor => untyped } import java.util.concurrent.CompletionStage import java.util.concurrent.ThreadFactory import scala.concurrent.ExecutionContextExecutor import scala.concurrent.Future +import akka.Done +import akka.{ actor => untyped } import akka.actor.BootstrapSetup import akka.actor.setup.ActorSystemSetup import akka.actor.typed.internal.InternalRecipientRef @@ -106,20 +107,25 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter * Terminates this actor system. This will stop the guardian actor, which in turn * will recursively stop all its child actors, then the system guardian * (below which the logging actors reside). + * + * This is an asynchronous operation and completion of the termination can + * be observed with [[ActorSystem.whenTerminated]] or [[ActorSystem.getWhenTerminated]]. */ - def terminate(): Future[Terminated] + def terminate(): Unit /** - * Returns a Future which will be completed after the ActorSystem has been terminated - * and termination hooks have been executed. + * Scala API: Returns a Future which will be completed after the ActorSystem has been terminated + * and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]] + * or by stopping the guardian actor. */ - def whenTerminated: Future[Terminated] + def whenTerminated: Future[Done] /** - * Returns a CompletionStage which will be completed after the ActorSystem has been terminated - * and termination hooks have been executed. + * Java API: Returns a CompletionStage which will be completed after the ActorSystem has been terminated + * and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]] + * or by stopping the guardian actor. */ - def getWhenTerminated: CompletionStage[Terminated] + def getWhenTerminated: CompletionStage[Done] /** * The deadLetter address is a destination that will accept (and discard) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala index 798882cf76..9514f515a9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -9,6 +9,7 @@ package adapter import java.util.concurrent.CompletionStage import akka.actor +import akka.Done import akka.actor.ExtendedActorSystem import akka.actor.InvalidMessageException import akka.{ actor => untyped } @@ -91,11 +92,10 @@ import akka.event.LoggingFilterWithMarker import akka.dispatch.ExecutionContexts.sameThreadExecutionContext - override def terminate(): scala.concurrent.Future[akka.actor.typed.Terminated] = - untypedSystem.terminate().map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext) - override lazy val whenTerminated: scala.concurrent.Future[akka.actor.typed.Terminated] = - untypedSystem.whenTerminated.map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext) - override lazy val getWhenTerminated: CompletionStage[akka.actor.typed.Terminated] = + override def terminate(): Unit = untypedSystem.terminate() + override lazy val whenTerminated: scala.concurrent.Future[akka.Done] = + untypedSystem.whenTerminated.map(_ => Done)(sameThreadExecutionContext) + override lazy val getWhenTerminated: CompletionStage[akka.Done] = FutureConverters.toJava(whenTerminated) def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)( diff --git a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java index 88d0be86cf..1c113ddb71 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java @@ -11,8 +11,8 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; public class ClusterApiTest extends JUnitSuite { @@ -56,8 +56,10 @@ public class ClusterApiTest extends JUnitSuite { probe2.expectMessageClass(SelfRemoved.class); } finally { - // TODO no java API to terminate actor system - Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds")); + system1.terminate(); + system1.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); + system2.terminate(); + system2.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS); } } } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala index e8445fbe85..3f4c4af153 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala @@ -38,11 +38,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with case class Probe(message: String, replyTo: ActorRef[String]) def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)( - block: ActorSystem[T] => Unit): Terminated = { + block: ActorSystem[T] => Unit): Unit = { val sys = system(behavior, s"$suite-$name") try { block(sys) - if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue + if (doTerminate) { + sys.terminate() + sys.whenTerminated.futureValue + } } catch { case NonFatal(ex) => sys.terminate() @@ -52,20 +55,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with "An ActorSystem" must { "start the guardian actor and terminate when it terminates" in { - val t = withSystem("a", Behaviors.receiveMessage[Probe] { p => + withSystem("a", Behaviors.receiveMessage[Probe] { p => p.replyTo ! p.message Behaviors.stopped - }, doTerminate = false) { sys => + }, doTerminate = false) { sys ⇒ val inbox = TestInbox[String]("a") sys ! Probe("hello", inbox.ref) eventually { inbox.hasMessages should ===(true) } inbox.receiveAll() should ===("hello" :: Nil) + sys.whenTerminated.futureValue } - val p = t.ref.path - p.name should ===("/") - p.address.system should ===(suite + "-a") } // see issue #24172 @@ -97,7 +98,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with inbox.receiveAll() should ===("started" :: Nil) // now we know that the guardian has started, and should receive PostStop - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue inbox.receiveAll() should ===("done" :: Nil) } @@ -108,7 +110,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with // for this case the guardian might not have been started before // the system terminates and then it will not receive PostStop, which // is OK since it wasn't really started yet - sys.terminate().futureValue + sys.terminate() + sys.whenTerminated.futureValue } "log to the event stream" in { diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 2a5828e592..a83e111092 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -185,7 +185,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { if (down) { // abrupt termination - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) clusterNode1.manager ! Down(clusterNode2.selfMember.address) } else { clusterNode1.manager ! Leave(clusterNode2.selfMember.address) @@ -295,7 +296,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { regProbe1.expectMessage(Pong) // abrupt termination - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) clusterNode1.manager ! Down(clusterNode2.selfMember.address) regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) @@ -337,7 +339,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { // abrupt termination but then a node with the same host:port comes online quickly system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) val testKit3 = ActorTestKit( system1.name, @@ -447,7 +450,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { // abrupt termination but then a node with the same host:port comes online quickly system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) - Await.ready(system2.terminate(), 10.seconds) + system2.terminate() + Await.ready(system2.whenTerminated, 10.seconds) val testKit3 = ActorTestKit( system1.name, diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala index f5bad04fbf..48897892db 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala @@ -71,8 +71,10 @@ class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually val cluster1 = Cluster(system1) val cluster2 = Cluster(system2) } finally { - system1.terminate().futureValue - system2.terminate().futureValue + system1.terminate() + system1.whenTerminated.futureValue + system2.terminate() + system2.whenTerminated.futureValue } } @@ -139,8 +141,10 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually cluster2.isTerminated shouldEqual true } } finally { - system.terminate().futureValue - system2.terminate().futureValue + system.terminate() + system.whenTerminated.futureValue + system2.terminate() + system2.whenTerminated.futureValue } } @@ -220,9 +224,12 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually system3.whenTerminated.futureValue } finally { - system1.terminate().futureValue - system2.terminate().futureValue - system3.terminate().futureValue + system1.terminate() + system1.whenTerminated.futureValue + system2.terminate() + system2.whenTerminated.futureValue + system3.terminate() + system3.whenTerminated.futureValue } } }