Merge pull request #24958 from akka/wip-typed-extension-setup-patriknw
Possibility to replace Typed extensions via ActorSystemSetup, #24954
This commit is contained in:
commit
74eb7999d8
11 changed files with 206 additions and 13 deletions
|
|
@ -5,14 +5,17 @@
|
|||
package akka.actor.typed;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.actor.setup.ActorSystemSetup;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static junit.framework.TestCase.assertSame;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ExtensionsTest extends JUnitSuite {
|
||||
|
|
@ -40,6 +43,15 @@ public class ExtensionsTest extends JUnitSuite {
|
|||
}
|
||||
}
|
||||
|
||||
public static class MyExtImplViaSetup extends MyExtImpl {
|
||||
}
|
||||
|
||||
public static class MyExtensionSetup extends ExtensionSetup<MyExtImpl> {
|
||||
public MyExtensionSetup(Function<ActorSystem<?>, MyExtImpl> createExtension) {
|
||||
super(MyExtension.getInstance(), createExtension);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void loadJavaExtensionsFromConfig() {
|
||||
|
|
@ -76,5 +88,24 @@ public class ExtensionsTest extends JUnitSuite {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void overrideExtensionsViaActorSystemSetup() {
|
||||
final ActorSystem<Object> system = ActorSystem.create(
|
||||
Behavior.empty(),
|
||||
"overrideExtensionsViaActorSystemSetup",
|
||||
ActorSystemSetup.create(new MyExtensionSetup(sys -> new MyExtImplViaSetup())));
|
||||
|
||||
try {
|
||||
MyExtImpl instance1 = MyExtension.get(system);
|
||||
assertEquals(MyExtImplViaSetup.class, instance1.getClass());
|
||||
|
||||
MyExtImpl instance2 = MyExtension.get(system);
|
||||
assertSame(instance1, instance2);
|
||||
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,12 +10,17 @@ import com.typesafe.config.{ Config, ConfigFactory }
|
|||
import scala.concurrent.Future
|
||||
|
||||
import akka.actor.BootstrapSetup
|
||||
import akka.actor.setup.ActorSystemSetup
|
||||
|
||||
class DummyExtension1 extends Extension
|
||||
object DummyExtension1 extends ExtensionId[DummyExtension1] {
|
||||
def createExtension(system: ActorSystem[_]) = new DummyExtension1
|
||||
def get(system: ActorSystem[_]): DummyExtension1 = apply(system)
|
||||
}
|
||||
class DummyExtension1Setup(factory: ActorSystem[_] ⇒ DummyExtension1)
|
||||
extends AbstractExtensionSetup[DummyExtension1](DummyExtension1, factory)
|
||||
|
||||
class DummyExtension1ViaSetup extends DummyExtension1
|
||||
|
||||
class SlowExtension extends Extension
|
||||
object SlowExtension extends ExtensionId[SlowExtension] {
|
||||
|
|
@ -184,12 +189,34 @@ class ExtensionsSpec extends TypedAkkaSpec {
|
|||
untypedSystem.terminate().futureValue
|
||||
}
|
||||
}
|
||||
|
||||
"override extensions via ActorSystemSetup" in
|
||||
withEmptyActorSystem("ExtensionsSpec10", Some(ConfigFactory.parseString(
|
||||
"""
|
||||
akka.typed.extensions = ["akka.actor.typed.DummyExtension1$", "akka.actor.typed.SlowExtension$"]
|
||||
""")),
|
||||
Some(ActorSystemSetup(new DummyExtension1Setup(sys ⇒ new DummyExtension1ViaSetup)))
|
||||
) { system ⇒
|
||||
system.hasExtension(DummyExtension1) should ===(true)
|
||||
system.extension(DummyExtension1) shouldBe a[DummyExtension1ViaSetup]
|
||||
DummyExtension1(system) shouldBe a[DummyExtension1ViaSetup]
|
||||
DummyExtension1(system) shouldBe theSameInstanceAs(DummyExtension1(system))
|
||||
|
||||
system.hasExtension(SlowExtension) should ===(true)
|
||||
system.extension(SlowExtension) shouldBe a[SlowExtension]
|
||||
}
|
||||
}
|
||||
|
||||
def withEmptyActorSystem[T](name: String, config: Option[Config] = None)(f: ActorSystem[_] ⇒ T): T = {
|
||||
val system = config match {
|
||||
case None ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name)
|
||||
case Some(c) ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name, c)
|
||||
def withEmptyActorSystem[T](name: String, config: Option[Config] = None, setup: Option[ActorSystemSetup] = None)(
|
||||
f: ActorSystem[_] ⇒ T): T = {
|
||||
|
||||
val bootstrap = config match {
|
||||
case Some(c) ⇒ BootstrapSetup(c)
|
||||
case None ⇒ BootstrapSetup()
|
||||
}
|
||||
val system = setup match {
|
||||
case None ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name, bootstrap)
|
||||
case Some(s) ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name, s.and(bootstrap))
|
||||
}
|
||||
|
||||
try f(system) finally system.terminate().futureValue
|
||||
|
|
|
|||
|
|
@ -38,3 +38,18 @@ class ActorRefResolver(system: ActorSystem[_]) extends Extension {
|
|||
untypedSystem.provider.resolveActorRef(serializedActorRef)
|
||||
}
|
||||
|
||||
object ActorRefResolverSetup {
|
||||
def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ ActorRefResolver): ActorRefResolverSetup =
|
||||
new ActorRefResolverSetup(new java.util.function.Function[ActorSystem[_], ActorRefResolver] {
|
||||
override def apply(sys: ActorSystem[_]): ActorRefResolver = createExtension(sys)
|
||||
}) // TODO can be simplified when compiled only with Scala >= 2.12
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]]
|
||||
* to replace the default implementation of the [[ActorRefResolver]] extension. Intended
|
||||
* for tests that need to replace extension with stub/mock implementations.
|
||||
*/
|
||||
final class ActorRefResolverSetup(createExtension: java.util.function.Function[ActorSystem[_], ActorRefResolver])
|
||||
extends ExtensionSetup[ActorRefResolver](ActorRefResolver, createExtension)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.actor.typed
|
||||
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.actor.setup.Setup
|
||||
|
||||
/**
|
||||
* Marker trait/interface for extensions. An extension can be registered in the ActorSystem and is guaranteed to only
|
||||
|
|
@ -97,7 +98,12 @@ trait Extension
|
|||
* MyExt.get(system).someMethodOnTheExtension()
|
||||
* }}}
|
||||
*
|
||||
* For testing purposes extensions typically provide a concrete [[ExtensionSetup]]
|
||||
* that can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]]
|
||||
* to replace the default implementation of the extension.
|
||||
*
|
||||
* @tparam T The concrete extension type
|
||||
* @see [[ExtensionSetup]]
|
||||
*/
|
||||
abstract class ExtensionId[T <: Extension] {
|
||||
|
||||
|
|
@ -113,6 +119,11 @@ abstract class ExtensionId[T <: Extension] {
|
|||
|
||||
override final def hashCode: Int = System.identityHashCode(this)
|
||||
override final def equals(other: Any): Boolean = this eq other.asInstanceOf[AnyRef]
|
||||
|
||||
/**
|
||||
* Java API: The identifier of the extension
|
||||
*/
|
||||
def id: ExtensionId[T] = this
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -144,3 +155,24 @@ trait Extensions {
|
|||
def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* Each extension typically provide a concrete `ExtensionSetup` that can be used in
|
||||
* [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] to replace the default
|
||||
* implementation of the extension. Intended for tests that need to replace
|
||||
* extension with stub/mock implementations.
|
||||
*/
|
||||
abstract class ExtensionSetup[T <: Extension](
|
||||
val extId: ExtensionId[T],
|
||||
val createExtension: java.util.function.Function[ActorSystem[_], T])
|
||||
extends Setup
|
||||
|
||||
/**
|
||||
* Scala 2.11 API: Each extension typically provide a concrete `ExtensionSetup` that can be used in
|
||||
* [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] to replace the default
|
||||
* implementation of the extension. Intended for tests that need to replace
|
||||
* extension with stub/mock implementations.
|
||||
*/
|
||||
abstract class AbstractExtensionSetup[T <: Extension](extId: ExtensionId[T], createExtension: ActorSystem[_] ⇒ T)
|
||||
extends ExtensionSetup[T](extId, new java.util.function.Function[ActorSystem[_], T] {
|
||||
override def apply(sys: ActorSystem[_]): T = createExtension.apply(sys)
|
||||
}) // TODO can be simplified when compiled only with Scala >= 2.12
|
||||
|
|
|
|||
|
|
@ -8,11 +8,12 @@ import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch }
|
|||
|
||||
import akka.annotation.InternalApi
|
||||
import akka.actor.typed.{ ActorSystem, Extension, ExtensionId, Extensions }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import akka.actor.typed.ExtensionSetup
|
||||
|
||||
/**
|
||||
* Actor system extensions registry
|
||||
*
|
||||
|
|
@ -29,6 +30,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒
|
|||
* Hook for ActorSystem to load extensions on startup
|
||||
*/
|
||||
@InternalApi private[akka] def loadExtensions(): Unit = {
|
||||
|
||||
/**
|
||||
* @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)
|
||||
*/
|
||||
|
|
@ -81,10 +83,13 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒
|
|||
val inProcessOfRegistration = new CountDownLatch(1)
|
||||
extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process
|
||||
case null ⇒ try { // Signal was successfully sent
|
||||
// Create and initialize the extension
|
||||
ext.createExtension(self) match {
|
||||
// Create and initialize the extension, first look for ExtensionSetup
|
||||
val instance = self.settings.setup.setups.collectFirst {
|
||||
case (_, extSetup: ExtensionSetup[_]) if extSetup.extId == ext ⇒ extSetup.createExtension(self)
|
||||
}.getOrElse(ext.createExtension(self))
|
||||
instance match {
|
||||
case null ⇒ throw new IllegalStateException("Extension instance created as 'null' for extension [" + ext + "]")
|
||||
case instance ⇒
|
||||
case instance: T @unchecked ⇒
|
||||
// Replace our in process signal with the initialized extension
|
||||
extensions.replace(ext, inProcessOfRegistration, instance)
|
||||
instance
|
||||
|
|
|
|||
|
|
@ -7,11 +7,12 @@ package akka.actor.typed.receptionist
|
|||
import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId }
|
||||
import akka.actor.typed.internal.receptionist._
|
||||
import akka.annotation.DoNotInherit
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.duration._
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import akka.actor.typed.ExtensionSetup
|
||||
|
||||
class Receptionist(system: ActorSystem[_]) extends Extension {
|
||||
|
||||
private def hasCluster: Boolean = {
|
||||
|
|
@ -274,3 +275,19 @@ object Receptionist extends ExtensionId[Receptionist] {
|
|||
Listing(key, Set[ActorRef[T]](serviceInstances.asScala.toSeq: _*))
|
||||
|
||||
}
|
||||
|
||||
object ReceptionistSetup {
|
||||
def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ Receptionist): ReceptionistSetup =
|
||||
new ReceptionistSetup(new java.util.function.Function[ActorSystem[_], Receptionist] {
|
||||
override def apply(sys: ActorSystem[_]): Receptionist = createExtension(sys)
|
||||
}) // TODO can be simplified when compiled only with Scala >= 2.12
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]]
|
||||
* to replace the default implementation of the [[Receptionist]] extension. Intended
|
||||
* for tests that need to replace extension with stub/mock implementations.
|
||||
*/
|
||||
final class ReceptionistSetup(createExtension: java.util.function.Function[ActorSystem[_], Receptionist])
|
||||
extends ExtensionSetup[Receptionist](Receptionist, createExtension)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ import scala.annotation.varargs
|
|||
import scala.compat.java8.OptionConverters._
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
/**
|
||||
* Marker supertype for a setup part that can be put inside [[ActorSystemSetup]], if a specific concrete setup
|
||||
* is not specified in the actor system setup that means defaults are used (usually from the config file) - no concrete
|
||||
|
|
@ -49,7 +51,7 @@ object ActorSystemSetup {
|
|||
* Constructor is *Internal API*. Use the factory methods [[ActorSystemSetup#create]] and [[akka.actor.Actor#apply]] to create
|
||||
* instances.
|
||||
*/
|
||||
final class ActorSystemSetup private[akka] (setups: Map[Class[_], AnyRef]) {
|
||||
final class ActorSystemSetup private[akka] (@InternalApi private[akka] val setups: Map[Class[_], AnyRef]) {
|
||||
|
||||
/**
|
||||
* Java API: Extract a concrete [[Setup]] of type `T` if it is defined in the settings.
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import scala.concurrent.Future
|
|||
|
||||
import akka.actor.Scheduler
|
||||
import akka.util.Timeout
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
|
|
@ -17,6 +16,7 @@ import akka.actor.typed.ActorSystem
|
|||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Extension
|
||||
import akka.actor.typed.ExtensionId
|
||||
import akka.actor.typed.ExtensionSetup
|
||||
import akka.actor.typed.Props
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.annotation.InternalApi
|
||||
|
|
@ -305,3 +305,18 @@ object EntityRef {
|
|||
}
|
||||
}
|
||||
|
||||
object ClusterShardingSetup {
|
||||
def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ ClusterSharding): ClusterShardingSetup =
|
||||
new ClusterShardingSetup(new java.util.function.Function[ActorSystem[_], ClusterSharding] {
|
||||
override def apply(sys: ActorSystem[_]): ClusterSharding = createExtension(sys)
|
||||
}) // TODO can be simplified when compiled only with Scala >= 2.12
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]]
|
||||
* to replace the default implementation of the [[ClusterSharding]] extension. Intended
|
||||
* for tests that need to replace extension with stub/mock implementations.
|
||||
*/
|
||||
final class ClusterShardingSetup(createExtension: java.util.function.Function[ActorSystem[_], ClusterSharding])
|
||||
extends ExtensionSetup[ClusterSharding](ClusterSharding, createExtension)
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import akka.actor.typed.ActorSystem
|
|||
import akka.actor.typed.Extension
|
||||
import akka.actor.typed.ExtensionId
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ExtensionSetup
|
||||
|
||||
object DistributedData extends ExtensionId[DistributedData] {
|
||||
def get(system: ActorSystem[_]): DistributedData = apply(system)
|
||||
|
|
@ -35,3 +36,18 @@ class DistributedData(system: ActorSystem[_]) extends Extension {
|
|||
|
||||
}
|
||||
|
||||
object DistributedDataSetup {
|
||||
def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ DistributedData): DistributedDataSetup =
|
||||
new DistributedDataSetup(new java.util.function.Function[ActorSystem[_], DistributedData] {
|
||||
override def apply(sys: ActorSystem[_]): DistributedData = createExtension(sys)
|
||||
}) // TODO can be simplified when compiled only with Scala >= 2.12
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]]
|
||||
* to replace the default implementation of the [[DistributedData]] extension. Intended
|
||||
* for tests that need to replace extension with stub/mock implementations.
|
||||
*/
|
||||
final class DistributedDataSetup(createExtension: java.util.function.Function[ActorSystem[_], DistributedData])
|
||||
extends ExtensionSetup[DistributedData](DistributedData, createExtension)
|
||||
|
|
|
|||
|
|
@ -11,9 +11,10 @@ import akka.cluster._
|
|||
import akka.japi.Util
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId }
|
||||
import akka.cluster.typed.internal.AdapterClusterImpl
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
import akka.actor.typed.ExtensionSetup
|
||||
|
||||
/**
|
||||
* Messages for subscribing to changes in the cluster state
|
||||
*
|
||||
|
|
@ -183,3 +184,19 @@ abstract class Cluster extends Extension {
|
|||
def manager: ActorRef[ClusterCommand]
|
||||
|
||||
}
|
||||
|
||||
object ClusterSetup {
|
||||
def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ Cluster): ClusterSetup =
|
||||
new ClusterSetup(new java.util.function.Function[ActorSystem[_], Cluster] {
|
||||
override def apply(sys: ActorSystem[_]): Cluster = createExtension(sys)
|
||||
}) // TODO can be simplified when compiled only with Scala >= 2.12
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]]
|
||||
* to replace the default implementation of the [[Cluster]] extension. Intended
|
||||
* for tests that need to replace extension with stub/mock implementations.
|
||||
*/
|
||||
final class ClusterSetup(createExtension: java.util.function.Function[ActorSystem[_], Cluster])
|
||||
extends ExtensionSetup[Cluster](Cluster, createExtension)
|
||||
|
|
|
|||
|
|
@ -13,9 +13,10 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionI
|
|||
import akka.util.JavaDurationConverters._
|
||||
import com.typesafe.config.Config
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
|
||||
import akka.actor.typed.ExtensionSetup
|
||||
|
||||
object ClusterSingletonSettings {
|
||||
def apply(
|
||||
system: ActorSystem[_]
|
||||
|
|
@ -239,3 +240,18 @@ final class ClusterSingletonManagerSettings(
|
|||
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
|
||||
}
|
||||
|
||||
object ClusterSingletonSetup {
|
||||
def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ ClusterSingleton): ClusterSingletonSetup =
|
||||
new ClusterSingletonSetup(new java.util.function.Function[ActorSystem[_], ClusterSingleton] {
|
||||
override def apply(sys: ActorSystem[_]): ClusterSingleton = createExtension(sys)
|
||||
}) // TODO can be simplified when compiled only with Scala >= 2.12
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]]
|
||||
* to replace the default implementation of the [[ClusterSingleton]] extension. Intended
|
||||
* for tests that need to replace extension with stub/mock implementations.
|
||||
*/
|
||||
final class ClusterSingletonSetup(createExtension: java.util.function.Function[ActorSystem[_], ClusterSingleton])
|
||||
extends ExtensionSetup[ClusterSingleton](ClusterSingleton, createExtension)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue