From 414b28f518d35610ea578b1f19bf953d81eb5dac Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 25 Mar 2020 18:46:47 +0100 Subject: [PATCH 1/3] Persistence initialization utility #28808 --- .../scala/akka/persistence/Persistence.scala | 49 ++++++++++++++--- .../akka/persistence/PersistenceInit.scala | 53 +++++++++++++++++++ 2 files changed, 94 insertions(+), 8 deletions(-) create mode 100644 akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 1c168e357c..03564bc58b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -4,23 +4,29 @@ package akka.persistence +import java.util.UUID import java.util.concurrent.atomic.AtomicReference import java.util.function.Consumer -import akka.actor._ -import akka.event.{ Logging, LoggingAdapter } -import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters } -import akka.util.Collections.EmptyImmutableSeq -import akka.util.Helpers.ConfigOps -import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec +import scala.concurrent.Future import scala.concurrent.duration._ - -import akka.util.Reflect import scala.util.control.NonFatal +import akka.Done +import akka.actor._ import akka.annotation.InternalApi import akka.annotation.InternalStableApi +import akka.event.Logging +import akka.event.LoggingAdapter +import akka.persistence.journal.EventAdapters +import akka.persistence.journal.IdentityEventAdapters +import akka.util.Collections.EmptyImmutableSeq +import akka.util.Helpers.ConfigOps +import akka.util.Reflect +import akka.util.Timeout +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory /** * Persistence configuration. @@ -281,6 +287,33 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def verifySnapshotPluginConfigExists(pluginConfig: Config, configPath: String): Unit = verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Snapshot store") + /** + * Scala API: Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * @return a `Future` that is completed when the initialization has completed + */ + def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: FiniteDuration): Future[Done] = { + val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}" + val ref = + system.systemActorOf(PersistenceInit.props(journalPluginId, snapshotPluginId, persistenceId), persistenceId) + import akka.pattern.ask + import system.dispatcher + implicit val askTimeout: Timeout = timeout + (ref ? "start").map(_ => Done) + } + + /** + * Java API: Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * @return a `Future` that is completed when the initialization has completed + */ + def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: java.time.Duration): Future[Done] = { + import akka.util.JavaDurationConverters._ + initializePlugin(journalPluginId, snapshotPluginId, timeout.asScala) + } + /** * Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters. * If no adapters are registered for a given journal the EventAdapters object will simply return the identity diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala b/akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala new file mode 100644 index 0000000000..54afc3e3eb --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ +package akka.persistence + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorLogging +import akka.actor.Props +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi private[akka] object PersistenceInit { + + def props(journalPluginId: String, snapshotPluginId: String, persistenceId: String): Props = { + Props(new PersistenceInit(journalPluginId, snapshotPluginId, persistenceId)) + } +} + +/** + * INTERNAL API: Initialize a journal and snapshot plugin by starting this `PersistentActor` + * and send any message to it. It will reply to the `sender()` with the same message when + * recovery has completed. + */ +@InternalApi private[akka] class PersistenceInit( + override val journalPluginId: String, + override val snapshotPluginId: String, + override val persistenceId: String) + extends PersistentActor + with ActorLogging { + + private val startTime = System.nanoTime() + + def receiveRecover: Receive = { + case RecoveryCompleted => + log.debug( + "Initialization completed for journal [{}] and snapshot [{}] plugins, with persistenceId [{}], in [{} ms]", + journalPluginId, + snapshotPluginId, + persistenceId, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) + case _ => + } + + def receiveCommand: Receive = { + case msg => + // recovery has completed + sender() ! msg + context.stop(self) + } +} From 7e07ee17a6e8783b8be1c2c9ed04f74f0a9a7954 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 30 Mar 2020 16:24:38 +0200 Subject: [PATCH 2/3] move to testkit, and docs --- .../main/paradox/typed/persistence-testing.md | 23 +++++++- .../testkit/PersistenceInitTest.java | 48 ++++++++++++++++ .../testkit/PersistenceInitSpec.scala | 38 +++++++++++++ .../internal/PersistenceInitImpl.scala | 11 ++-- .../testkit/javadsl/PersistenceInit.scala | 45 +++++++++++++++ .../testkit/scaladsl/PersistenceInit.scala | 55 +++++++++++++++++++ .../scala/akka/persistence/Persistence.scala | 53 ++++-------------- 7 files changed, 225 insertions(+), 48 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java create mode 100644 akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala rename akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala => akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala (79%) create mode 100644 akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala create mode 100644 akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala diff --git a/akka-docs/src/main/paradox/typed/persistence-testing.md b/akka-docs/src/main/paradox/typed/persistence-testing.md index 8261e92d3f..f7aa50ee5a 100644 --- a/akka-docs/src/main/paradox/typed/persistence-testing.md +++ b/akka-docs/src/main/paradox/typed/persistence-testing.md @@ -188,4 +188,25 @@ For tests that involve more than one Cluster node you have to use another journa While it's possible to use the @ref:[Persistence Plugin Proxy](../persistence-plugins.md#persistence-plugin-proxy) it's often better and more realistic to use a real database. -See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128). +See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128). + +### Plugin initialization + +Some Persistence plugins create tables automatically, but has the limitation that it can't be done concurrently +from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize +the plugins at the same time. To coordinate initialization you can use the `PersistenceInit` utility. + +`PersistenceInit` is part of `akka-persistence-testkit` and you need to add the dependency to your project: + +@@dependency[sbt,Maven,Gradle] { + group="com.typesafe.akka" + artifact="akka-persistence-testkit_$scala.binary_version$" + version="$akka.version$" +} + +Scala +: @@snip [PersistenceInitSpec.scala](/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala) { #imports #init } + +Java +: @@snip [PersistenceInitTest.java](/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java) { #imports #init } + diff --git a/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java new file mode 100644 index 0000000000..f74c8e2760 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.persistence.testkit; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; + +import com.typesafe.config.ConfigFactory; +import jdocs.AbstractJavaTest; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.UUID; + +//#imports +import akka.persistence.testkit.javadsl.PersistenceInit; +import akka.Done; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +//#imports + +public class PersistenceInitTest extends AbstractJavaTest { + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource( + ConfigFactory.parseString( + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n" + + "akka.persistence.journal.inmem.test-serialization = on \n" + + "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n" + + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-" + + UUID.randomUUID().toString() + + "\" \n" + ) + .withFallback(ConfigFactory.defaultApplication())); + + @Test + public void testInit() throws Exception { + //#init + Duration timeout = Duration.ofSeconds(5); + CompletionStage done = PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout); + done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS); + //#init + } +} diff --git a/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala b/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala new file mode 100644 index 0000000000..24227b270e --- /dev/null +++ b/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.persistence.testkit + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import org.scalatest.wordspec.AnyWordSpecLike + +//#imports +import akka.persistence.testkit.scaladsl.PersistenceInit + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ + +//#imports + +class PersistenceInitSpec + extends ScalaTestWithActorTestKit( + """ + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}" + """) + with AnyWordSpecLike { + + "PersistenceInit" should { + "initialize plugins" in { + //#init + val timeout = 5.seconds + val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout) + Await.result(done, timeout) + //#init + } + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala similarity index 79% rename from akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala rename to akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala index 54afc3e3eb..bde769571d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala @@ -1,21 +1,24 @@ /* * Copyright (C) 2020 Lightbend Inc. */ -package akka.persistence + +package akka.persistence.testkit.internal import java.util.concurrent.TimeUnit import akka.actor.ActorLogging import akka.actor.Props import akka.annotation.InternalApi +import akka.persistence.PersistentActor +import akka.persistence.RecoveryCompleted /** * INTERNAL API */ -@InternalApi private[akka] object PersistenceInit { +@InternalApi private[akka] object PersistenceInitImpl { def props(journalPluginId: String, snapshotPluginId: String, persistenceId: String): Props = { - Props(new PersistenceInit(journalPluginId, snapshotPluginId, persistenceId)) + Props(new PersistenceInitImpl(journalPluginId, snapshotPluginId, persistenceId)) } } @@ -24,7 +27,7 @@ import akka.annotation.InternalApi * and send any message to it. It will reply to the `sender()` with the same message when * recovery has completed. */ -@InternalApi private[akka] class PersistenceInit( +@InternalApi private[akka] class PersistenceInitImpl( override val journalPluginId: String, override val snapshotPluginId: String, override val persistenceId: String) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala new file mode 100644 index 0000000000..7f1c62dc7a --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.javadsl + +import java.time.Duration +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.persistence.testkit.scaladsl +import akka.util.JavaDurationConverters._ + +/** + * Test utility to initialize persistence plugins. Useful when initialization order or coordination + * is needed. For example to avoid creating tables concurrently. + */ +object PersistenceInit { + + /** + * Initialize the default journal and snapshot plugins. + * + * @return a `CompletionStage` that is completed when the initialization has completed + */ + def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: Duration): CompletionStage[Done] = + initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout) + + /** + * Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * + * @return a `CompletionStage` that is completed when the initialization has completed + */ + def initializePlugins( + system: ClassicActorSystemProvider, + journalPluginId: String, + snapshotPluginId: String, + timeout: Duration): CompletionStage[Done] = + scaladsl.PersistenceInit.initializePlugins(system, journalPluginId, snapshotPluginId, timeout.asScala).toJava + +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala new file mode 100644 index 0000000000..2e61a5135f --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.scaladsl + +import java.util.UUID + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.actor.ExtendedActorSystem +import akka.persistence.testkit.internal.PersistenceInitImpl +import akka.util.Timeout + +/** + * Test utility to initialize persistence plugins. Useful when initialization order or coordination + * is needed. For example to avoid creating tables concurrently. + */ +object PersistenceInit { + + /** + * Initialize the default journal and snapshot plugins. + * + * @return a `Future` that is completed when the initialization has completed + */ + def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: FiniteDuration): Future[Done] = + initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout) + + /** + * Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * + * @return a `Future` that is completed when the initialization has completed + */ + def initializePlugins( + system: ClassicActorSystemProvider, + journalPluginId: String, + snapshotPluginId: String, + timeout: FiniteDuration): Future[Done] = { + val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}" + val extSystem = system.classicSystem.asInstanceOf[ExtendedActorSystem] + val ref = + extSystem.systemActorOf( + PersistenceInitImpl.props(journalPluginId, snapshotPluginId, persistenceId), + persistenceId) + import akka.pattern.ask + import extSystem.dispatcher + implicit val askTimeout: Timeout = timeout + (ref ? "start").map(_ => Done) + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 03564bc58b..1c168e357c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -4,29 +4,23 @@ package akka.persistence -import java.util.UUID import java.util.concurrent.atomic.AtomicReference import java.util.function.Consumer -import scala.annotation.tailrec -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -import akka.Done import akka.actor._ -import akka.annotation.InternalApi -import akka.annotation.InternalStableApi -import akka.event.Logging -import akka.event.LoggingAdapter -import akka.persistence.journal.EventAdapters -import akka.persistence.journal.IdentityEventAdapters +import akka.event.{ Logging, LoggingAdapter } +import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters } import akka.util.Collections.EmptyImmutableSeq import akka.util.Helpers.ConfigOps +import com.typesafe.config.{ Config, ConfigFactory } +import scala.annotation.tailrec +import scala.concurrent.duration._ + import akka.util.Reflect -import akka.util.Timeout -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory +import scala.util.control.NonFatal + +import akka.annotation.InternalApi +import akka.annotation.InternalStableApi /** * Persistence configuration. @@ -287,33 +281,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def verifySnapshotPluginConfigExists(pluginConfig: Config, configPath: String): Unit = verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Snapshot store") - /** - * Scala API: Initialize the given journal and snapshot plugins. - * - * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. - * @return a `Future` that is completed when the initialization has completed - */ - def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: FiniteDuration): Future[Done] = { - val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}" - val ref = - system.systemActorOf(PersistenceInit.props(journalPluginId, snapshotPluginId, persistenceId), persistenceId) - import akka.pattern.ask - import system.dispatcher - implicit val askTimeout: Timeout = timeout - (ref ? "start").map(_ => Done) - } - - /** - * Java API: Initialize the given journal and snapshot plugins. - * - * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. - * @return a `Future` that is completed when the initialization has completed - */ - def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: java.time.Duration): Future[Done] = { - import akka.util.JavaDurationConverters._ - initializePlugin(journalPluginId, snapshotPluginId, timeout.asScala) - } - /** * Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters. * If no adapters are registered for a given journal the EventAdapters object will simply return the identity From 3c12f195769643ca99bb23829b8384a99ae22e88 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 30 Mar 2020 16:56:22 +0200 Subject: [PATCH 3/3] java format --- .../testkit/PersistenceInitTest.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java index f74c8e2760..710cb0a35e 100644 --- a/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java @@ -13,7 +13,7 @@ import org.junit.Test; import java.util.UUID; -//#imports +// #imports import akka.persistence.testkit.javadsl.PersistenceInit; import akka.Done; @@ -21,28 +21,28 @@ import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -//#imports +// #imports public class PersistenceInitTest extends AbstractJavaTest { @ClassRule public static final TestKitJunitResource testKit = - new TestKitJunitResource( - ConfigFactory.parseString( - "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n" - + "akka.persistence.journal.inmem.test-serialization = on \n" + - "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n" - + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-" - + UUID.randomUUID().toString() - + "\" \n" - ) - .withFallback(ConfigFactory.defaultApplication())); + new TestKitJunitResource( + ConfigFactory.parseString( + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n" + + "akka.persistence.journal.inmem.test-serialization = on \n" + + "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n" + + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-" + + UUID.randomUUID().toString() + + "\" \n") + .withFallback(ConfigFactory.defaultApplication())); @Test public void testInit() throws Exception { - //#init + // #init Duration timeout = Duration.ofSeconds(5); - CompletionStage done = PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout); + CompletionStage done = + PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout); done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS); - //#init + // #init } }