From d2b14f5c9cf441533b67f63d880cb8ee7269bbe1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 20 Apr 2018 16:53:57 +0100 Subject: [PATCH] Possibility to replace Typed extensions via ActorSystemSetup, #24954 --- .../java/akka/actor/typed/ExtensionsTest.java | 31 ++++++++++++++++ .../akka/actor/typed/ExtensionsSpec.scala | 35 ++++++++++++++++--- .../akka/actor/typed/ActorRefResolver.scala | 15 ++++++++ .../scala/akka/actor/typed/Extensions.scala | 32 +++++++++++++++++ .../actor/typed/internal/ExtensionsImpl.scala | 13 ++++--- .../typed/receptionist/Receptionist.scala | 19 +++++++++- .../akka/actor/setup/ActorSystemSetup.scala | 4 ++- .../typed/scaladsl/ClusterSharding.scala | 17 ++++++++- .../ddata/typed/javadsl/DistributedData.scala | 16 +++++++++ .../scala/akka/cluster/typed/Cluster.scala | 19 +++++++++- .../akka/cluster/typed/ClusterSingleton.scala | 18 +++++++++- 11 files changed, 206 insertions(+), 13 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java index 700a9b5754..a29612de9b 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java @@ -5,14 +5,17 @@ package akka.actor.typed; import akka.actor.*; +import akka.actor.setup.ActorSystemSetup; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import java.util.Optional; +import java.util.function.Function; import static junit.framework.TestCase.assertSame; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class ExtensionsTest extends JUnitSuite { @@ -40,6 +43,15 @@ public class ExtensionsTest extends JUnitSuite { } } + public static class MyExtImplViaSetup extends MyExtImpl { + } + + public static class MyExtensionSetup extends ExtensionSetup { + public MyExtensionSetup(Function, MyExtImpl> createExtension) { + super(MyExtension.getInstance(), createExtension); + } + } + @Test public void loadJavaExtensionsFromConfig() { @@ -76,5 +88,24 @@ public class ExtensionsTest extends JUnitSuite { } } + @Test + public void overrideExtensionsViaActorSystemSetup() { + final ActorSystem system = ActorSystem.create( + Behavior.empty(), + "overrideExtensionsViaActorSystemSetup", + ActorSystemSetup.create(new MyExtensionSetup(sys -> new MyExtImplViaSetup()))); + + try { + MyExtImpl instance1 = MyExtension.get(system); + assertEquals(MyExtImplViaSetup.class, instance1.getClass()); + + MyExtImpl instance2 = MyExtension.get(system); + assertSame(instance1, instance2); + + } finally { + system.terminate(); + } + } + } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala index 8598a003e4..8874f850c9 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala @@ -10,12 +10,17 @@ import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.Future import akka.actor.BootstrapSetup +import akka.actor.setup.ActorSystemSetup class DummyExtension1 extends Extension object DummyExtension1 extends ExtensionId[DummyExtension1] { def createExtension(system: ActorSystem[_]) = new DummyExtension1 def get(system: ActorSystem[_]): DummyExtension1 = apply(system) } +class DummyExtension1Setup(factory: ActorSystem[_] ⇒ DummyExtension1) + extends AbstractExtensionSetup[DummyExtension1](DummyExtension1, factory) + +class DummyExtension1ViaSetup extends DummyExtension1 class SlowExtension extends Extension object SlowExtension extends ExtensionId[SlowExtension] { @@ -184,12 +189,34 @@ class ExtensionsSpec extends TypedAkkaSpec { untypedSystem.terminate().futureValue } } + + "override extensions via ActorSystemSetup" in + withEmptyActorSystem("ExtensionsSpec10", Some(ConfigFactory.parseString( + """ + akka.typed.extensions = ["akka.actor.typed.DummyExtension1$", "akka.actor.typed.SlowExtension$"] + """)), + Some(ActorSystemSetup(new DummyExtension1Setup(sys ⇒ new DummyExtension1ViaSetup))) + ) { system ⇒ + system.hasExtension(DummyExtension1) should ===(true) + system.extension(DummyExtension1) shouldBe a[DummyExtension1ViaSetup] + DummyExtension1(system) shouldBe a[DummyExtension1ViaSetup] + DummyExtension1(system) shouldBe theSameInstanceAs(DummyExtension1(system)) + + system.hasExtension(SlowExtension) should ===(true) + system.extension(SlowExtension) shouldBe a[SlowExtension] + } } - def withEmptyActorSystem[T](name: String, config: Option[Config] = None)(f: ActorSystem[_] ⇒ T): T = { - val system = config match { - case None ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name) - case Some(c) ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name, c) + def withEmptyActorSystem[T](name: String, config: Option[Config] = None, setup: Option[ActorSystemSetup] = None)( + f: ActorSystem[_] ⇒ T): T = { + + val bootstrap = config match { + case Some(c) ⇒ BootstrapSetup(c) + case None ⇒ BootstrapSetup() + } + val system = setup match { + case None ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name, bootstrap) + case Some(s) ⇒ ActorSystem[Any](Behavior.EmptyBehavior, name, s.and(bootstrap)) } try f(system) finally system.terminate().futureValue diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala index 8f83c45aa5..4d91187b51 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRefResolver.scala @@ -38,3 +38,18 @@ class ActorRefResolver(system: ActorSystem[_]) extends Extension { untypedSystem.provider.resolveActorRef(serializedActorRef) } +object ActorRefResolverSetup { + def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ ActorRefResolver): ActorRefResolverSetup = + new ActorRefResolverSetup(new java.util.function.Function[ActorSystem[_], ActorRefResolver] { + override def apply(sys: ActorSystem[_]): ActorRefResolver = createExtension(sys) + }) // TODO can be simplified when compiled only with Scala >= 2.12 + +} + +/** + * Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] + * to replace the default implementation of the [[ActorRefResolver]] extension. Intended + * for tests that need to replace extension with stub/mock implementations. + */ +final class ActorRefResolverSetup(createExtension: java.util.function.Function[ActorSystem[_], ActorRefResolver]) + extends ExtensionSetup[ActorRefResolver](ActorRefResolver, createExtension) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala index a6f14e29c8..6cd03dfda8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Extensions.scala @@ -5,6 +5,7 @@ package akka.actor.typed import akka.annotation.DoNotInherit +import akka.actor.setup.Setup /** * Marker trait/interface for extensions. An extension can be registered in the ActorSystem and is guaranteed to only @@ -97,7 +98,12 @@ trait Extension * MyExt.get(system).someMethodOnTheExtension() * }}} * + * For testing purposes extensions typically provide a concrete [[ExtensionSetup]] + * that can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] + * to replace the default implementation of the extension. + * * @tparam T The concrete extension type + * @see [[ExtensionSetup]] */ abstract class ExtensionId[T <: Extension] { @@ -113,6 +119,11 @@ abstract class ExtensionId[T <: Extension] { override final def hashCode: Int = System.identityHashCode(this) override final def equals(other: Any): Boolean = this eq other.asInstanceOf[AnyRef] + + /** + * Java API: The identifier of the extension + */ + def id: ExtensionId[T] = this } /** @@ -144,3 +155,24 @@ trait Extensions { def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean } +/** + * Each extension typically provide a concrete `ExtensionSetup` that can be used in + * [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] to replace the default + * implementation of the extension. Intended for tests that need to replace + * extension with stub/mock implementations. + */ +abstract class ExtensionSetup[T <: Extension]( + val extId: ExtensionId[T], + val createExtension: java.util.function.Function[ActorSystem[_], T]) + extends Setup + +/** + * Scala 2.11 API: Each extension typically provide a concrete `ExtensionSetup` that can be used in + * [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] to replace the default + * implementation of the extension. Intended for tests that need to replace + * extension with stub/mock implementations. + */ +abstract class AbstractExtensionSetup[T <: Extension](extId: ExtensionId[T], createExtension: ActorSystem[_] ⇒ T) + extends ExtensionSetup[T](extId, new java.util.function.Function[ActorSystem[_], T] { + override def apply(sys: ActorSystem[_]): T = createExtension.apply(sys) + }) // TODO can be simplified when compiled only with Scala >= 2.12 diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala index c8e5051069..fb1b47a95b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala @@ -8,11 +8,12 @@ import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch } import akka.annotation.InternalApi import akka.actor.typed.{ ActorSystem, Extension, ExtensionId, Extensions } - import scala.annotation.tailrec import scala.util.{ Failure, Success, Try } import scala.collection.JavaConverters._ +import akka.actor.typed.ExtensionSetup + /** * Actor system extensions registry * @@ -29,6 +30,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒ * Hook for ActorSystem to load extensions on startup */ @InternalApi private[akka] def loadExtensions(): Unit = { + /** * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) */ @@ -81,10 +83,13 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒ val inProcessOfRegistration = new CountDownLatch(1) extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process case null ⇒ try { // Signal was successfully sent - // Create and initialize the extension - ext.createExtension(self) match { + // Create and initialize the extension, first look for ExtensionSetup + val instance = self.settings.setup.setups.collectFirst { + case (_, extSetup: ExtensionSetup[_]) if extSetup.extId == ext ⇒ extSetup.createExtension(self) + }.getOrElse(ext.createExtension(self)) + instance match { case null ⇒ throw new IllegalStateException("Extension instance created as 'null' for extension [" + ext + "]") - case instance ⇒ + case instance: T @unchecked ⇒ // Replace our in process signal with the initialized extension extensions.replace(ext, inProcessOfRegistration, instance) instance diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala index 02591891da..c4cba544e7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala @@ -7,11 +7,12 @@ package akka.actor.typed.receptionist import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId } import akka.actor.typed.internal.receptionist._ import akka.annotation.DoNotInherit - import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.reflect.ClassTag +import akka.actor.typed.ExtensionSetup + class Receptionist(system: ActorSystem[_]) extends Extension { private def hasCluster: Boolean = { @@ -274,3 +275,19 @@ object Receptionist extends ExtensionId[Receptionist] { Listing(key, Set[ActorRef[T]](serviceInstances.asScala.toSeq: _*)) } + +object ReceptionistSetup { + def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ Receptionist): ReceptionistSetup = + new ReceptionistSetup(new java.util.function.Function[ActorSystem[_], Receptionist] { + override def apply(sys: ActorSystem[_]): Receptionist = createExtension(sys) + }) // TODO can be simplified when compiled only with Scala >= 2.12 + +} + +/** + * Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] + * to replace the default implementation of the [[Receptionist]] extension. Intended + * for tests that need to replace extension with stub/mock implementations. + */ +final class ReceptionistSetup(createExtension: java.util.function.Function[ActorSystem[_], Receptionist]) + extends ExtensionSetup[Receptionist](Receptionist, createExtension) diff --git a/akka-actor/src/main/scala/akka/actor/setup/ActorSystemSetup.scala b/akka-actor/src/main/scala/akka/actor/setup/ActorSystemSetup.scala index b39b070f2d..60d3a08382 100644 --- a/akka-actor/src/main/scala/akka/actor/setup/ActorSystemSetup.scala +++ b/akka-actor/src/main/scala/akka/actor/setup/ActorSystemSetup.scala @@ -10,6 +10,8 @@ import scala.annotation.varargs import scala.compat.java8.OptionConverters._ import scala.reflect.ClassTag +import akka.annotation.InternalApi + /** * Marker supertype for a setup part that can be put inside [[ActorSystemSetup]], if a specific concrete setup * is not specified in the actor system setup that means defaults are used (usually from the config file) - no concrete @@ -49,7 +51,7 @@ object ActorSystemSetup { * Constructor is *Internal API*. Use the factory methods [[ActorSystemSetup#create]] and [[akka.actor.Actor#apply]] to create * instances. */ -final class ActorSystemSetup private[akka] (setups: Map[Class[_], AnyRef]) { +final class ActorSystemSetup private[akka] (@InternalApi private[akka] val setups: Map[Class[_], AnyRef]) { /** * Java API: Extract a concrete [[Setup]] of type `T` if it is defined in the settings. diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index ecac721af1..89143e982e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -9,7 +9,6 @@ import scala.concurrent.Future import akka.actor.Scheduler import akka.util.Timeout - import scala.reflect.ClassTag import akka.actor.typed.ActorRef @@ -17,6 +16,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.Extension import akka.actor.typed.ExtensionId +import akka.actor.typed.ExtensionSetup import akka.actor.typed.Props import akka.annotation.DoNotInherit import akka.annotation.InternalApi @@ -305,3 +305,18 @@ object EntityRef { } } +object ClusterShardingSetup { + def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ ClusterSharding): ClusterShardingSetup = + new ClusterShardingSetup(new java.util.function.Function[ActorSystem[_], ClusterSharding] { + override def apply(sys: ActorSystem[_]): ClusterSharding = createExtension(sys) + }) // TODO can be simplified when compiled only with Scala >= 2.12 + +} + +/** + * Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] + * to replace the default implementation of the [[ClusterSharding]] extension. Intended + * for tests that need to replace extension with stub/mock implementations. + */ +final class ClusterShardingSetup(createExtension: java.util.function.Function[ActorSystem[_], ClusterSharding]) + extends ExtensionSetup[ClusterSharding](ClusterSharding, createExtension) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala index 99c5bd0736..6ed83bb7c7 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/javadsl/DistributedData.scala @@ -8,6 +8,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.Extension import akka.actor.typed.ExtensionId import akka.actor.typed.ActorRef +import akka.actor.typed.ExtensionSetup object DistributedData extends ExtensionId[DistributedData] { def get(system: ActorSystem[_]): DistributedData = apply(system) @@ -35,3 +36,18 @@ class DistributedData(system: ActorSystem[_]) extends Extension { } +object DistributedDataSetup { + def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ DistributedData): DistributedDataSetup = + new DistributedDataSetup(new java.util.function.Function[ActorSystem[_], DistributedData] { + override def apply(sys: ActorSystem[_]): DistributedData = createExtension(sys) + }) // TODO can be simplified when compiled only with Scala >= 2.12 + +} + +/** + * Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] + * to replace the default implementation of the [[DistributedData]] extension. Intended + * for tests that need to replace extension with stub/mock implementations. + */ +final class DistributedDataSetup(createExtension: java.util.function.Function[ActorSystem[_], DistributedData]) + extends ExtensionSetup[DistributedData](DistributedData, createExtension) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala index a7978a0cb2..5e638e1fbe 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala @@ -11,9 +11,10 @@ import akka.cluster._ import akka.japi.Util import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId } import akka.cluster.typed.internal.AdapterClusterImpl - import scala.collection.immutable +import akka.actor.typed.ExtensionSetup + /** * Messages for subscribing to changes in the cluster state * @@ -183,3 +184,19 @@ abstract class Cluster extends Extension { def manager: ActorRef[ClusterCommand] } + +object ClusterSetup { + def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ Cluster): ClusterSetup = + new ClusterSetup(new java.util.function.Function[ActorSystem[_], Cluster] { + override def apply(sys: ActorSystem[_]): Cluster = createExtension(sys) + }) // TODO can be simplified when compiled only with Scala >= 2.12 + +} + +/** + * Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] + * to replace the default implementation of the [[Cluster]] extension. Intended + * for tests that need to replace extension with stub/mock implementations. + */ +final class ClusterSetup(createExtension: java.util.function.Function[ActorSystem[_], Cluster]) + extends ExtensionSetup[Cluster](Cluster, createExtension) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala index a95aa1945a..ce9c701525 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala @@ -13,9 +13,10 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionI import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import scala.concurrent.duration._ - import scala.concurrent.duration.{ Duration, FiniteDuration } +import akka.actor.typed.ExtensionSetup + object ClusterSingletonSettings { def apply( system: ActorSystem[_] @@ -236,3 +237,18 @@ final class ClusterSingletonManagerSettings( new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) } +object ClusterSingletonSetup { + def apply[T <: Extension](createExtension: ActorSystem[_] ⇒ ClusterSingleton): ClusterSingletonSetup = + new ClusterSingletonSetup(new java.util.function.Function[ActorSystem[_], ClusterSingleton] { + override def apply(sys: ActorSystem[_]): ClusterSingleton = createExtension(sys) + }) // TODO can be simplified when compiled only with Scala >= 2.12 + +} + +/** + * Can be used in [[akka.actor.setup.ActorSystemSetup]] when starting the [[ActorSystem]] + * to replace the default implementation of the [[ClusterSingleton]] extension. Intended + * for tests that need to replace extension with stub/mock implementations. + */ +final class ClusterSingletonSetup(createExtension: java.util.function.Function[ActorSystem[_], ClusterSingleton]) + extends ExtensionSetup[ClusterSingleton](ClusterSingleton, createExtension)