Change typed whenTerminated signature to Future[Done], #25647

This change ignore the terminated passed from untyped and map it into Done,
with some minor changes for testing termination.

termiate() returns Unit to not bias it towards the Scala API, completion
can be observed with whenTerminated or getWhenTerminated
This commit is contained in:
Tzu-Chiao Yeh 2018-09-20 17:08:09 +08:00 committed by Patrik Nordwall
parent c06cf62b64
commit 7cc6266ad0
12 changed files with 91 additions and 62 deletions

View file

@ -17,12 +17,12 @@ import akka.actor.typed.{
ExtensionId, ExtensionId,
Logger, Logger,
Props, Props,
Settings, Settings
Terminated
} }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.util.Timeout import akka.util.Timeout
import akka.{ actor => untyped } import akka.{ actor => untyped }
import akka.Done
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.compat.java8.FutureConverters import scala.compat.java8.FutureConverters
import scala.concurrent._ import scala.concurrent._
@ -73,13 +73,10 @@ import akka.actor.typed.internal.InternalRecipientRef
override def scheduler: untyped.Scheduler = throw new UnsupportedOperationException("no scheduler") override def scheduler: untyped.Scheduler = throw new UnsupportedOperationException("no scheduler")
private val terminationPromise = Promise[Terminated] private val terminationPromise = Promise[Done]
override def terminate(): Future[akka.actor.typed.Terminated] = { override def terminate(): Unit = terminationPromise.trySuccess(Done)
terminationPromise.trySuccess(Terminated(this)) override def whenTerminated: Future[Done] = terminationPromise.future
terminationPromise.future override def getWhenTerminated: CompletionStage[Done] = FutureConverters.toJava(whenTerminated)
}
override def whenTerminated: Future[akka.actor.typed.Terminated] = terminationPromise.future
override def getWhenTerminated: CompletionStage[Terminated] = FutureConverters.toJava(whenTerminated)
override val startTime: Long = System.currentTimeMillis() override val startTime: Long = System.currentTimeMillis()
override def uptime: Long = System.currentTimeMillis() - startTime override def uptime: Long = System.currentTimeMillis() - startTime
override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory { override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory {

View file

@ -4,8 +4,9 @@
package akka.actor.testkit.typed.scaladsl package akka.actor.testkit.typed.scaladsl
import scala.concurrent.Promise import akka.Done
import scala.concurrent.Promise
import akka.actor.typed.Terminated import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
@ -60,7 +61,7 @@ class ActorTestKitSpec extends ScalaTestWithActorTestKit with WordSpecLike {
// usually done in test framework hook method but we want to assert // usually done in test framework hook method but we want to assert
val testkit2 = ActorTestKit() val testkit2 = ActorTestKit()
testkit2.shutdownTestKit() testkit2.shutdownTestKit()
testkit2.system.whenTerminated.futureValue shouldBe a[Terminated] testkit2.system.whenTerminated.futureValue shouldBe a[Done]
} }
} }

View file

@ -5,6 +5,7 @@
/** Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com> */ /** Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com> */
package akka.actor.typed; package akka.actor.typed;
import akka.Done;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
@ -17,15 +18,16 @@ public class ActorSystemTest extends JUnitSuite {
@Test @Test
public void testGetWhenTerminated() throws Exception { public void testGetWhenTerminated() throws Exception {
final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem"); final ActorSystem<Void> system =
ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem");
system.terminate(); system.terminate();
final CompletionStage<Terminated> cs = system.getWhenTerminated(); final CompletionStage<Done> cs = system.getWhenTerminated();
cs.toCompletableFuture().get(2, SECONDS); cs.toCompletableFuture().get(2, SECONDS);
} }
@Test @Test
public void testGetWhenTerminatedWithoutTermination() { public void testGetWhenTerminatedWithoutTermination() {
final ActorSystem system = final ActorSystem<Void> system =
ActorSystem.create(Behavior.empty(), "GetWhenTerminatedWithoutTermination"); ActorSystem.create(Behavior.empty(), "GetWhenTerminatedWithoutTermination");
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone()); assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
} }

View file

@ -264,6 +264,9 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with WordSpecLike {
} }
try f(sys) try f(sys)
finally sys.terminate().futureValue finally {
sys.terminate()
sys.whenTerminated.futureValue
}
} }
} }

View file

@ -27,11 +27,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
case class Probe(message: String, replyTo: ActorRef[String]) case class Probe(message: String, replyTo: ActorRef[String])
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)( def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(
block: ActorSystem[T] => Unit): Terminated = { block: ActorSystem[T] Unit): Unit = {
val sys = system(behavior, s"$suite-$name") val sys = system(behavior, s"$suite-$name")
try { try {
block(sys) block(sys)
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue if (doTerminate) {
sys.terminate()
sys.whenTerminated.futureValue
}
} catch { } catch {
case NonFatal(ex) => case NonFatal(ex) =>
sys.terminate() sys.terminate()
@ -41,20 +44,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
"An ActorSystem" must { "An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in { "start the guardian actor and terminate when it terminates" in {
val t = withSystem("a", Behaviors.receiveMessage[Probe] { p => withSystem("a", Behaviors.receiveMessage[Probe] { p =>
p.replyTo ! p.message p.replyTo ! p.message
Behaviors.stopped Behaviors.stopped
}, doTerminate = false) { sys => }, doTerminate = false) { sys
val inbox = TestInbox[String]("a") val inbox = TestInbox[String]("a")
sys ! Probe("hello", inbox.ref) sys ! Probe("hello", inbox.ref)
eventually { eventually {
inbox.hasMessages should ===(true) inbox.hasMessages should ===(true)
} }
inbox.receiveAll() should ===("hello" :: Nil) inbox.receiveAll() should ===("hello" :: Nil)
sys.whenTerminated.futureValue
} }
val p = t.ref.path
p.name should ===("/")
p.address.system should ===(suite + "-a")
} }
// see issue #24172 // see issue #24172
@ -86,7 +87,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
inbox.receiveAll() should ===("started" :: Nil) inbox.receiveAll() should ===("started" :: Nil)
// now we know that the guardian has started, and should receive PostStop // now we know that the guardian has started, and should receive PostStop
sys.terminate().futureValue sys.terminate()
sys.whenTerminated.futureValue
inbox.receiveAll() should ===("done" :: Nil) inbox.receiveAll() should ===("done" :: Nil)
} }
@ -97,7 +99,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// for this case the guardian might not have been started before // for this case the guardian might not have been started before
// the system terminates and then it will not receive PostStop, which // the system terminates and then it will not receive PostStop, which
// is OK since it wasn't really started yet // is OK since it wasn't really started yet
sys.terminate().futureValue sys.terminate()
sys.whenTerminated.futureValue
} }
"log to the event stream" in { "log to the event stream" in {

View file

@ -44,7 +44,8 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik
printer ! PrintMe("not message 2") printer ! PrintMe("not message 2")
// #fire-and-forget-doit // #fire-and-forget-doit
system.terminate().futureValue system.terminate()
system.whenTerminated.futureValue
} }
"contain a sample for request response" in { "contain a sample for request response" in {

View file

@ -4,12 +4,13 @@
package akka.actor.typed package akka.actor.typed
import akka.{ actor => untyped }
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import java.util.concurrent.ThreadFactory import java.util.concurrent.ThreadFactory
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future import scala.concurrent.Future
import akka.Done
import akka.{ actor => untyped }
import akka.actor.BootstrapSetup import akka.actor.BootstrapSetup
import akka.actor.setup.ActorSystemSetup import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.InternalRecipientRef
@ -106,20 +107,25 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter
* Terminates this actor system. This will stop the guardian actor, which in turn * Terminates this actor system. This will stop the guardian actor, which in turn
* will recursively stop all its child actors, then the system guardian * will recursively stop all its child actors, then the system guardian
* (below which the logging actors reside). * (below which the logging actors reside).
*
* This is an asynchronous operation and completion of the termination can
* be observed with [[ActorSystem.whenTerminated]] or [[ActorSystem.getWhenTerminated]].
*/ */
def terminate(): Future[Terminated] def terminate(): Unit
/** /**
* Returns a Future which will be completed after the ActorSystem has been terminated * Scala API: Returns a Future which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed. * and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]]
* or by stopping the guardian actor.
*/ */
def whenTerminated: Future[Terminated] def whenTerminated: Future[Done]
/** /**
* Returns a CompletionStage which will be completed after the ActorSystem has been terminated * Java API: Returns a CompletionStage which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed. * and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]]
* or by stopping the guardian actor.
*/ */
def getWhenTerminated: CompletionStage[Terminated] def getWhenTerminated: CompletionStage[Done]
/** /**
* The deadLetter address is a destination that will accept (and discard) * The deadLetter address is a destination that will accept (and discard)

View file

@ -9,6 +9,7 @@ package adapter
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import akka.actor import akka.actor
import akka.Done
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.InvalidMessageException import akka.actor.InvalidMessageException
import akka.{ actor => untyped } import akka.{ actor => untyped }
@ -91,11 +92,10 @@ import akka.event.LoggingFilterWithMarker
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
override def terminate(): scala.concurrent.Future[akka.actor.typed.Terminated] = override def terminate(): Unit = untypedSystem.terminate()
untypedSystem.terminate().map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext) override lazy val whenTerminated: scala.concurrent.Future[akka.Done] =
override lazy val whenTerminated: scala.concurrent.Future[akka.actor.typed.Terminated] = untypedSystem.whenTerminated.map(_ => Done)(sameThreadExecutionContext)
untypedSystem.whenTerminated.map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext) override lazy val getWhenTerminated: CompletionStage[akka.Done] =
override lazy val getWhenTerminated: CompletionStage[akka.actor.typed.Terminated] =
FutureConverters.toJava(whenTerminated) FutureConverters.toJava(whenTerminated)
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)( def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)(

View file

@ -11,8 +11,8 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit;
public class ClusterApiTest extends JUnitSuite { public class ClusterApiTest extends JUnitSuite {
@ -56,8 +56,10 @@ public class ClusterApiTest extends JUnitSuite {
probe2.expectMessageClass(SelfRemoved.class); probe2.expectMessageClass(SelfRemoved.class);
} finally { } finally {
// TODO no java API to terminate actor system system1.terminate();
Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds")); system1.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
system2.terminate();
system2.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
} }
} }
} }

View file

@ -38,11 +38,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
case class Probe(message: String, replyTo: ActorRef[String]) case class Probe(message: String, replyTo: ActorRef[String])
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)( def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(
block: ActorSystem[T] => Unit): Terminated = { block: ActorSystem[T] => Unit): Unit = {
val sys = system(behavior, s"$suite-$name") val sys = system(behavior, s"$suite-$name")
try { try {
block(sys) block(sys)
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue if (doTerminate) {
sys.terminate()
sys.whenTerminated.futureValue
}
} catch { } catch {
case NonFatal(ex) => case NonFatal(ex) =>
sys.terminate() sys.terminate()
@ -52,20 +55,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
"An ActorSystem" must { "An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in { "start the guardian actor and terminate when it terminates" in {
val t = withSystem("a", Behaviors.receiveMessage[Probe] { p => withSystem("a", Behaviors.receiveMessage[Probe] { p =>
p.replyTo ! p.message p.replyTo ! p.message
Behaviors.stopped Behaviors.stopped
}, doTerminate = false) { sys => }, doTerminate = false) { sys
val inbox = TestInbox[String]("a") val inbox = TestInbox[String]("a")
sys ! Probe("hello", inbox.ref) sys ! Probe("hello", inbox.ref)
eventually { eventually {
inbox.hasMessages should ===(true) inbox.hasMessages should ===(true)
} }
inbox.receiveAll() should ===("hello" :: Nil) inbox.receiveAll() should ===("hello" :: Nil)
sys.whenTerminated.futureValue
} }
val p = t.ref.path
p.name should ===("/")
p.address.system should ===(suite + "-a")
} }
// see issue #24172 // see issue #24172
@ -97,7 +98,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
inbox.receiveAll() should ===("started" :: Nil) inbox.receiveAll() should ===("started" :: Nil)
// now we know that the guardian has started, and should receive PostStop // now we know that the guardian has started, and should receive PostStop
sys.terminate().futureValue sys.terminate()
sys.whenTerminated.futureValue
inbox.receiveAll() should ===("done" :: Nil) inbox.receiveAll() should ===("done" :: Nil)
} }
@ -108,7 +110,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// for this case the guardian might not have been started before // for this case the guardian might not have been started before
// the system terminates and then it will not receive PostStop, which // the system terminates and then it will not receive PostStop, which
// is OK since it wasn't really started yet // is OK since it wasn't really started yet
sys.terminate().futureValue sys.terminate()
sys.whenTerminated.futureValue
} }
"log to the event stream" in { "log to the event stream" in {

View file

@ -185,7 +185,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
if (down) { if (down) {
// abrupt termination // abrupt termination
Await.ready(system2.terminate(), 10.seconds) system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
clusterNode1.manager ! Down(clusterNode2.selfMember.address) clusterNode1.manager ! Down(clusterNode2.selfMember.address)
} else { } else {
clusterNode1.manager ! Leave(clusterNode2.selfMember.address) clusterNode1.manager ! Leave(clusterNode2.selfMember.address)
@ -295,7 +296,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
regProbe1.expectMessage(Pong) regProbe1.expectMessage(Pong)
// abrupt termination // abrupt termination
Await.ready(system2.terminate(), 10.seconds) system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
clusterNode1.manager ! Down(clusterNode2.selfMember.address) clusterNode1.manager ! Down(clusterNode2.selfMember.address)
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
@ -337,7 +339,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
// abrupt termination but then a node with the same host:port comes online quickly // abrupt termination but then a node with the same host:port comes online quickly
system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress)
Await.ready(system2.terminate(), 10.seconds) system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
val testKit3 = ActorTestKit( val testKit3 = ActorTestKit(
system1.name, system1.name,
@ -447,7 +450,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
// abrupt termination but then a node with the same host:port comes online quickly // abrupt termination but then a node with the same host:port comes online quickly
system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress)
Await.ready(system2.terminate(), 10.seconds) system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
val testKit3 = ActorTestKit( val testKit3 = ActorTestKit(
system1.name, system1.name,

View file

@ -71,8 +71,10 @@ class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually
val cluster1 = Cluster(system1) val cluster1 = Cluster(system1)
val cluster2 = Cluster(system2) val cluster2 = Cluster(system2)
} finally { } finally {
system1.terminate().futureValue system1.terminate()
system2.terminate().futureValue system1.whenTerminated.futureValue
system2.terminate()
system2.whenTerminated.futureValue
} }
} }
@ -139,8 +141,10 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually
cluster2.isTerminated shouldEqual true cluster2.isTerminated shouldEqual true
} }
} finally { } finally {
system.terminate().futureValue system.terminate()
system2.terminate().futureValue system.whenTerminated.futureValue
system2.terminate()
system2.whenTerminated.futureValue
} }
} }
@ -220,9 +224,12 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually
system3.whenTerminated.futureValue system3.whenTerminated.futureValue
} finally { } finally {
system1.terminate().futureValue system1.terminate()
system2.terminate().futureValue system1.whenTerminated.futureValue
system3.terminate().futureValue system2.terminate()
system2.whenTerminated.futureValue
system3.terminate()
system3.whenTerminated.futureValue
} }
} }
} }