#24303 ActorSystem.getWhenTerminated which returns CompletionStage

This commit is contained in:
Evgeny Veretennikov 2018-01-18 16:36:21 +03:00 committed by Johan Andrén
parent 22b256dd4c
commit 5c68f2f627
7 changed files with 101 additions and 3 deletions

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.concurrent.CompletionStage;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;
public class ActorSystemTest extends JUnitSuite {
@Rule
public final AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ActorSystemTest");
private ActorSystem system = null;
@Before
public void beforeEach() {
system = actorSystemResource.getSystem();
}
@Test
public void testGetWhenTerminated() throws Exception {
system.terminate();
final CompletionStage<Terminated> cs = system.getWhenTerminated();
cs.toCompletableFuture().get(2, SECONDS);
}
@Test
public void testGetWhenTerminatedWithoutTermination() {
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
}
}

View file

@ -0,0 +1,29 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.concurrent.CompletionStage;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;
public class ActorSystemTest extends JUnitSuite {
@Test
public void testGetWhenTerminated() throws Exception {
final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedSystem");
system.terminate();
final CompletionStage<Terminated> cs = system.getWhenTerminated();
cs.toCompletableFuture().get(2, SECONDS);
}
@Test
public void testGetWhenTerminatedWithoutTermination() {
final ActorSystem system = ActorSystem.create(Behavior.empty(), "GetWhenTerminatedWithoutTermination");
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
}
}

View file

@ -5,7 +5,7 @@ package akka.actor.typed
import scala.concurrent.ExecutionContext
import akka.{ actor a, event e }
import java.util.concurrent.ThreadFactory
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import akka.actor.setup.ActorSystemSetup
import com.typesafe.config.{ Config, ConfigFactory }
@ -122,6 +122,12 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions {
*/
def whenTerminated: Future[Terminated]
/**
* Returns a CompletionStage which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed.
*/
def getWhenTerminated: CompletionStage[Terminated]
/**
* The deadLetter address is a destination that will accept (and discard)
* every message sent to it.

View file

@ -5,6 +5,8 @@ package akka.actor.typed
package internal
package adapter
import java.util.concurrent.CompletionStage
import akka.actor.InvalidMessageException
import akka.{ actor a }
@ -15,6 +17,8 @@ import scala.concurrent.Future
import akka.annotation.InternalApi
import akka.event.typed.EventStream
import scala.compat.java8.FutureConverters
/**
* INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context).
* Therefore it does not have a lot of vals, only the whenTerminated Future is cached after
@ -76,6 +80,8 @@ import akka.event.typed.EventStream
untyped.terminate().map(t Terminated(ActorRefAdapter(t.actor))(null))(sameThreadExecutionContext)
override lazy val whenTerminated: scala.concurrent.Future[akka.actor.typed.Terminated] =
untyped.whenTerminated.map(t Terminated(ActorRefAdapter(t.actor))(null))(sameThreadExecutionContext)
override lazy val getWhenTerminated: CompletionStage[akka.actor.typed.Terminated] =
FutureConverters.toJava(whenTerminated)
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)(implicit timeout: Timeout): Future[ActorRef[U]] = {
val ref = untyped.systemActorOf(PropsAdapter(() behavior, props), name)

View file

@ -0,0 +1,2 @@
# #24330 ActorSystem.getWhenTerminated
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorSystem.getWhenTerminated")

View file

@ -5,7 +5,7 @@
package akka.actor
import java.io.Closeable
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, RejectedExecutionException, ThreadFactory }
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.{ Config, ConfigFactory }
@ -25,6 +25,7 @@ import java.util.Optional
import akka.actor.setup.{ ActorSystemSetup, Setup }
import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters._
object BootstrapSetup {
@ -549,6 +550,16 @@ abstract class ActorSystem extends ActorRefFactory {
*/
def whenTerminated: Future[Terminated]
/**
* Returns a CompletionStage which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed. If you registered any callback with
* [[ActorSystem#registerOnTermination]], the returned CompletionStage from this method will not complete
* until all the registered callbacks are finished. Be careful to not schedule any operations
* on the `dispatcher` of this actor system as it will have been shut down before this
* future completes.
*/
def getWhenTerminated: CompletionStage[Terminated]
/**
* Registers the provided extension and creates its payload, if this extension isn't already registered
* This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization
@ -784,6 +795,7 @@ private[akka] class ActorSystemImpl(
private[this] final val terminationCallbacks = new TerminationCallbacks(provider.terminationFuture)(dispatcher)
override def whenTerminated: Future[Terminated] = terminationCallbacks.terminationFuture
override def getWhenTerminated: CompletionStage[Terminated] = FutureConverters.toJava(whenTerminated)
def lookupRoot: InternalActorRef = provider.rootGuardian
def guardian: LocalActorRef = provider.guardian
def systemGuardian: LocalActorRef = provider.systemGuardian

View file

@ -4,7 +4,7 @@
package akka.actor.typed
package internal
import java.util.concurrent.ThreadFactory
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import akka.annotation.InternalApi
import akka.event.Logging
@ -13,6 +13,7 @@ import akka.util.Timeout
import akka.{ actor a, event e }
import com.typesafe.config.ConfigFactory
import scala.compat.java8.FutureConverters
import scala.concurrent._
/**
@ -62,6 +63,7 @@ import scala.concurrent._
terminationPromise.future
}
override def whenTerminated: Future[akka.actor.typed.Terminated] = terminationPromise.future
override def getWhenTerminated: CompletionStage[Terminated] = FutureConverters.toJava(whenTerminated)
override val startTime: Long = System.currentTimeMillis()
override def uptime: Long = System.currentTimeMillis() - startTime
override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory {