Merge pull request #26644 from akka/wip-whenTerminate-patriknw

Change typed whenTerminated sig to Future[Done], #25647
This commit is contained in:
Patrik Nordwall 2019-04-01 10:28:21 +02:00 committed by GitHub
commit a023b15759
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 91 additions and 62 deletions

View file

@ -17,12 +17,12 @@ import akka.actor.typed.{
ExtensionId,
Logger,
Props,
Settings,
Terminated
Settings
}
import akka.annotation.InternalApi
import akka.util.Timeout
import akka.{ actor => untyped }
import akka.Done
import com.typesafe.config.ConfigFactory
import scala.compat.java8.FutureConverters
import scala.concurrent._
@ -73,13 +73,10 @@ import akka.actor.typed.internal.InternalRecipientRef
override def scheduler: untyped.Scheduler = throw new UnsupportedOperationException("no scheduler")
private val terminationPromise = Promise[Terminated]
override def terminate(): Future[akka.actor.typed.Terminated] = {
terminationPromise.trySuccess(Terminated(this))
terminationPromise.future
}
override def whenTerminated: Future[akka.actor.typed.Terminated] = terminationPromise.future
override def getWhenTerminated: CompletionStage[Terminated] = FutureConverters.toJava(whenTerminated)
private val terminationPromise = Promise[Done]
override def terminate(): Unit = terminationPromise.trySuccess(Done)
override def whenTerminated: Future[Done] = terminationPromise.future
override def getWhenTerminated: CompletionStage[Done] = FutureConverters.toJava(whenTerminated)
override val startTime: Long = System.currentTimeMillis()
override def uptime: Long = System.currentTimeMillis() - startTime
override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory {

View file

@ -4,8 +4,9 @@
package akka.actor.testkit.typed.scaladsl
import scala.concurrent.Promise
import akka.Done
import scala.concurrent.Promise
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.BeforeAndAfterAll
@ -60,7 +61,7 @@ class ActorTestKitSpec extends ScalaTestWithActorTestKit with WordSpecLike {
// usually done in test framework hook method but we want to assert
val testkit2 = ActorTestKit()
testkit2.shutdownTestKit()
testkit2.system.whenTerminated.futureValue shouldBe a[Terminated]
testkit2.system.whenTerminated.futureValue shouldBe a[Done]
}
}

View file

@ -5,6 +5,7 @@
/** Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com> */
package akka.actor.typed;
import akka.Done;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
@ -17,15 +18,16 @@ public class ActorSystemTest extends JUnitSuite {
@Test
public void testGetWhenTerminated() throws Exception {
final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem");
final ActorSystem<Void> system =
ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem");
system.terminate();
final CompletionStage<Terminated> cs = system.getWhenTerminated();
final CompletionStage<Done> cs = system.getWhenTerminated();
cs.toCompletableFuture().get(2, SECONDS);
}
@Test
public void testGetWhenTerminatedWithoutTermination() {
final ActorSystem system =
final ActorSystem<Void> system =
ActorSystem.create(Behavior.empty(), "GetWhenTerminatedWithoutTermination");
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
}

View file

@ -264,6 +264,9 @@ class ExtensionsSpec extends ScalaTestWithActorTestKit with WordSpecLike {
}
try f(sys)
finally sys.terminate().futureValue
finally {
sys.terminate()
sys.whenTerminated.futureValue
}
}
}

View file

@ -27,11 +27,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
case class Probe(message: String, replyTo: ActorRef[String])
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(
block: ActorSystem[T] => Unit): Terminated = {
block: ActorSystem[T] Unit): Unit = {
val sys = system(behavior, s"$suite-$name")
try {
block(sys)
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue
if (doTerminate) {
sys.terminate()
sys.whenTerminated.futureValue
}
} catch {
case NonFatal(ex) =>
sys.terminate()
@ -41,20 +44,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
"An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in {
val t = withSystem("a", Behaviors.receiveMessage[Probe] { p =>
withSystem("a", Behaviors.receiveMessage[Probe] { p =>
p.replyTo ! p.message
Behaviors.stopped
}, doTerminate = false) { sys =>
}, doTerminate = false) { sys
val inbox = TestInbox[String]("a")
sys ! Probe("hello", inbox.ref)
eventually {
inbox.hasMessages should ===(true)
}
inbox.receiveAll() should ===("hello" :: Nil)
sys.whenTerminated.futureValue
}
val p = t.ref.path
p.name should ===("/")
p.address.system should ===(suite + "-a")
}
// see issue #24172
@ -86,7 +87,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
inbox.receiveAll() should ===("started" :: Nil)
// now we know that the guardian has started, and should receive PostStop
sys.terminate().futureValue
sys.terminate()
sys.whenTerminated.futureValue
inbox.receiveAll() should ===("done" :: Nil)
}
@ -97,7 +99,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// for this case the guardian might not have been started before
// the system terminates and then it will not receive PostStop, which
// is OK since it wasn't really started yet
sys.terminate().futureValue
sys.terminate()
sys.whenTerminated.futureValue
}
"log to the event stream" in {

View file

@ -44,7 +44,8 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with WordSpecLik
printer ! PrintMe("not message 2")
// #fire-and-forget-doit
system.terminate().futureValue
system.terminate()
system.whenTerminated.futureValue
}
"contain a sample for request response" in {

View file

@ -4,12 +4,13 @@
package akka.actor.typed
import akka.{ actor => untyped }
import java.util.concurrent.CompletionStage
import java.util.concurrent.ThreadFactory
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import akka.Done
import akka.{ actor => untyped }
import akka.actor.BootstrapSetup
import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.internal.InternalRecipientRef
@ -106,20 +107,25 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter
* Terminates this actor system. This will stop the guardian actor, which in turn
* will recursively stop all its child actors, then the system guardian
* (below which the logging actors reside).
*
* This is an asynchronous operation and completion of the termination can
* be observed with [[ActorSystem.whenTerminated]] or [[ActorSystem.getWhenTerminated]].
*/
def terminate(): Future[Terminated]
def terminate(): Unit
/**
* Returns a Future which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed.
* Scala API: Returns a Future which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]]
* or by stopping the guardian actor.
*/
def whenTerminated: Future[Terminated]
def whenTerminated: Future[Done]
/**
* Returns a CompletionStage which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed.
* Java API: Returns a CompletionStage which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed. The `ActorSystem` can be stopped with [[ActorSystem.terminate]]
* or by stopping the guardian actor.
*/
def getWhenTerminated: CompletionStage[Terminated]
def getWhenTerminated: CompletionStage[Done]
/**
* The deadLetter address is a destination that will accept (and discard)

View file

@ -9,6 +9,7 @@ package adapter
import java.util.concurrent.CompletionStage
import akka.actor
import akka.Done
import akka.actor.ExtendedActorSystem
import akka.actor.InvalidMessageException
import akka.{ actor => untyped }
@ -91,11 +92,10 @@ import akka.event.LoggingFilterWithMarker
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
override def terminate(): scala.concurrent.Future[akka.actor.typed.Terminated] =
untypedSystem.terminate().map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext)
override lazy val whenTerminated: scala.concurrent.Future[akka.actor.typed.Terminated] =
untypedSystem.whenTerminated.map(t => Terminated(ActorRefAdapter(t.actor)))(sameThreadExecutionContext)
override lazy val getWhenTerminated: CompletionStage[akka.actor.typed.Terminated] =
override def terminate(): Unit = untypedSystem.terminate()
override lazy val whenTerminated: scala.concurrent.Future[akka.Done] =
untypedSystem.whenTerminated.map(_ => Done)(sameThreadExecutionContext)
override lazy val getWhenTerminated: CompletionStage[akka.Done] =
FutureConverters.toJava(whenTerminated)
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)(

View file

@ -11,8 +11,8 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class ClusterApiTest extends JUnitSuite {
@ -56,8 +56,10 @@ public class ClusterApiTest extends JUnitSuite {
probe2.expectMessageClass(SelfRemoved.class);
} finally {
// TODO no java API to terminate actor system
Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds"));
system1.terminate();
system1.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
system2.terminate();
system2.getWhenTerminated().toCompletableFuture().get(5, TimeUnit.SECONDS);
}
}
}

View file

@ -38,11 +38,14 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
case class Probe(message: String, replyTo: ActorRef[String])
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(
block: ActorSystem[T] => Unit): Terminated = {
block: ActorSystem[T] => Unit): Unit = {
val sys = system(behavior, s"$suite-$name")
try {
block(sys)
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue
if (doTerminate) {
sys.terminate()
sys.whenTerminated.futureValue
}
} catch {
case NonFatal(ex) =>
sys.terminate()
@ -52,20 +55,18 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
"An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in {
val t = withSystem("a", Behaviors.receiveMessage[Probe] { p =>
withSystem("a", Behaviors.receiveMessage[Probe] { p =>
p.replyTo ! p.message
Behaviors.stopped
}, doTerminate = false) { sys =>
}, doTerminate = false) { sys
val inbox = TestInbox[String]("a")
sys ! Probe("hello", inbox.ref)
eventually {
inbox.hasMessages should ===(true)
}
inbox.receiveAll() should ===("hello" :: Nil)
sys.whenTerminated.futureValue
}
val p = t.ref.path
p.name should ===("/")
p.address.system should ===(suite + "-a")
}
// see issue #24172
@ -97,7 +98,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
inbox.receiveAll() should ===("started" :: Nil)
// now we know that the guardian has started, and should receive PostStop
sys.terminate().futureValue
sys.terminate()
sys.whenTerminated.futureValue
inbox.receiveAll() should ===("done" :: Nil)
}
@ -108,7 +110,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// for this case the guardian might not have been started before
// the system terminates and then it will not receive PostStop, which
// is OK since it wasn't really started yet
sys.terminate().futureValue
sys.terminate()
sys.whenTerminated.futureValue
}
"log to the event stream" in {

View file

@ -185,7 +185,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
if (down) {
// abrupt termination
Await.ready(system2.terminate(), 10.seconds)
system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
clusterNode1.manager ! Down(clusterNode2.selfMember.address)
} else {
clusterNode1.manager ! Leave(clusterNode2.selfMember.address)
@ -295,7 +296,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
regProbe1.expectMessage(Pong)
// abrupt termination
Await.ready(system2.terminate(), 10.seconds)
system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
clusterNode1.manager ! Down(clusterNode2.selfMember.address)
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
@ -337,7 +339,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
// abrupt termination but then a node with the same host:port comes online quickly
system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress)
Await.ready(system2.terminate(), 10.seconds)
system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
val testKit3 = ActorTestKit(
system1.name,
@ -447,7 +450,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
// abrupt termination but then a node with the same host:port comes online quickly
system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress)
Await.ready(system2.terminate(), 10.seconds)
system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds)
val testKit3 = ActorTestKit(
system1.name,

View file

@ -71,8 +71,10 @@ class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually
val cluster1 = Cluster(system1)
val cluster2 = Cluster(system2)
} finally {
system1.terminate().futureValue
system2.terminate().futureValue
system1.terminate()
system1.whenTerminated.futureValue
system2.terminate()
system2.whenTerminated.futureValue
}
}
@ -139,8 +141,10 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually
cluster2.isTerminated shouldEqual true
}
} finally {
system.terminate().futureValue
system2.terminate().futureValue
system.terminate()
system.whenTerminated.futureValue
system2.terminate()
system2.whenTerminated.futureValue
}
}
@ -220,9 +224,12 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually
system3.whenTerminated.futureValue
} finally {
system1.terminate().futureValue
system2.terminate().futureValue
system3.terminate().futureValue
system1.terminate()
system1.whenTerminated.futureValue
system2.terminate()
system2.whenTerminated.futureValue
system3.terminate()
system3.whenTerminated.futureValue
}
}
}