From 7e07ee17a6e8783b8be1c2c9ed04f74f0a9a7954 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 30 Mar 2020 16:24:38 +0200 Subject: [PATCH] move to testkit, and docs --- .../main/paradox/typed/persistence-testing.md | 23 +++++++- .../testkit/PersistenceInitTest.java | 48 ++++++++++++++++ .../testkit/PersistenceInitSpec.scala | 38 +++++++++++++ .../internal/PersistenceInitImpl.scala | 11 ++-- .../testkit/javadsl/PersistenceInit.scala | 45 +++++++++++++++ .../testkit/scaladsl/PersistenceInit.scala | 55 +++++++++++++++++++ .../scala/akka/persistence/Persistence.scala | 53 ++++-------------- 7 files changed, 225 insertions(+), 48 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java create mode 100644 akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala rename akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala => akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala (79%) create mode 100644 akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala create mode 100644 akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala diff --git a/akka-docs/src/main/paradox/typed/persistence-testing.md b/akka-docs/src/main/paradox/typed/persistence-testing.md index 8261e92d3f..f7aa50ee5a 100644 --- a/akka-docs/src/main/paradox/typed/persistence-testing.md +++ b/akka-docs/src/main/paradox/typed/persistence-testing.md @@ -188,4 +188,25 @@ For tests that involve more than one Cluster node you have to use another journa While it's possible to use the @ref:[Persistence Plugin Proxy](../persistence-plugins.md#persistence-plugin-proxy) it's often better and more realistic to use a real database. -See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128). +See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128). + +### Plugin initialization + +Some Persistence plugins create tables automatically, but has the limitation that it can't be done concurrently +from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize +the plugins at the same time. To coordinate initialization you can use the `PersistenceInit` utility. + +`PersistenceInit` is part of `akka-persistence-testkit` and you need to add the dependency to your project: + +@@dependency[sbt,Maven,Gradle] { + group="com.typesafe.akka" + artifact="akka-persistence-testkit_$scala.binary_version$" + version="$akka.version$" +} + +Scala +: @@snip [PersistenceInitSpec.scala](/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala) { #imports #init } + +Java +: @@snip [PersistenceInitTest.java](/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java) { #imports #init } + diff --git a/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java new file mode 100644 index 0000000000..f74c8e2760 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.persistence.testkit; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; + +import com.typesafe.config.ConfigFactory; +import jdocs.AbstractJavaTest; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.UUID; + +//#imports +import akka.persistence.testkit.javadsl.PersistenceInit; +import akka.Done; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +//#imports + +public class PersistenceInitTest extends AbstractJavaTest { + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource( + ConfigFactory.parseString( + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n" + + "akka.persistence.journal.inmem.test-serialization = on \n" + + "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n" + + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-" + + UUID.randomUUID().toString() + + "\" \n" + ) + .withFallback(ConfigFactory.defaultApplication())); + + @Test + public void testInit() throws Exception { + //#init + Duration timeout = Duration.ofSeconds(5); + CompletionStage done = PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout); + done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS); + //#init + } +} diff --git a/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala b/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala new file mode 100644 index 0000000000..24227b270e --- /dev/null +++ b/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.persistence.testkit + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import org.scalatest.wordspec.AnyWordSpecLike + +//#imports +import akka.persistence.testkit.scaladsl.PersistenceInit + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ + +//#imports + +class PersistenceInitSpec + extends ScalaTestWithActorTestKit( + """ + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}" + """) + with AnyWordSpecLike { + + "PersistenceInit" should { + "initialize plugins" in { + //#init + val timeout = 5.seconds + val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout) + Await.result(done, timeout) + //#init + } + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala similarity index 79% rename from akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala rename to akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala index 54afc3e3eb..bde769571d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistenceInit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala @@ -1,21 +1,24 @@ /* * Copyright (C) 2020 Lightbend Inc. */ -package akka.persistence + +package akka.persistence.testkit.internal import java.util.concurrent.TimeUnit import akka.actor.ActorLogging import akka.actor.Props import akka.annotation.InternalApi +import akka.persistence.PersistentActor +import akka.persistence.RecoveryCompleted /** * INTERNAL API */ -@InternalApi private[akka] object PersistenceInit { +@InternalApi private[akka] object PersistenceInitImpl { def props(journalPluginId: String, snapshotPluginId: String, persistenceId: String): Props = { - Props(new PersistenceInit(journalPluginId, snapshotPluginId, persistenceId)) + Props(new PersistenceInitImpl(journalPluginId, snapshotPluginId, persistenceId)) } } @@ -24,7 +27,7 @@ import akka.annotation.InternalApi * and send any message to it. It will reply to the `sender()` with the same message when * recovery has completed. */ -@InternalApi private[akka] class PersistenceInit( +@InternalApi private[akka] class PersistenceInitImpl( override val journalPluginId: String, override val snapshotPluginId: String, override val persistenceId: String) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala new file mode 100644 index 0000000000..7f1c62dc7a --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.javadsl + +import java.time.Duration +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.persistence.testkit.scaladsl +import akka.util.JavaDurationConverters._ + +/** + * Test utility to initialize persistence plugins. Useful when initialization order or coordination + * is needed. For example to avoid creating tables concurrently. + */ +object PersistenceInit { + + /** + * Initialize the default journal and snapshot plugins. + * + * @return a `CompletionStage` that is completed when the initialization has completed + */ + def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: Duration): CompletionStage[Done] = + initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout) + + /** + * Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * + * @return a `CompletionStage` that is completed when the initialization has completed + */ + def initializePlugins( + system: ClassicActorSystemProvider, + journalPluginId: String, + snapshotPluginId: String, + timeout: Duration): CompletionStage[Done] = + scaladsl.PersistenceInit.initializePlugins(system, journalPluginId, snapshotPluginId, timeout.asScala).toJava + +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala new file mode 100644 index 0000000000..2e61a5135f --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.scaladsl + +import java.util.UUID + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.actor.ExtendedActorSystem +import akka.persistence.testkit.internal.PersistenceInitImpl +import akka.util.Timeout + +/** + * Test utility to initialize persistence plugins. Useful when initialization order or coordination + * is needed. For example to avoid creating tables concurrently. + */ +object PersistenceInit { + + /** + * Initialize the default journal and snapshot plugins. + * + * @return a `Future` that is completed when the initialization has completed + */ + def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: FiniteDuration): Future[Done] = + initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout) + + /** + * Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * + * @return a `Future` that is completed when the initialization has completed + */ + def initializePlugins( + system: ClassicActorSystemProvider, + journalPluginId: String, + snapshotPluginId: String, + timeout: FiniteDuration): Future[Done] = { + val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}" + val extSystem = system.classicSystem.asInstanceOf[ExtendedActorSystem] + val ref = + extSystem.systemActorOf( + PersistenceInitImpl.props(journalPluginId, snapshotPluginId, persistenceId), + persistenceId) + import akka.pattern.ask + import extSystem.dispatcher + implicit val askTimeout: Timeout = timeout + (ref ? "start").map(_ => Done) + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 03564bc58b..1c168e357c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -4,29 +4,23 @@ package akka.persistence -import java.util.UUID import java.util.concurrent.atomic.AtomicReference import java.util.function.Consumer -import scala.annotation.tailrec -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -import akka.Done import akka.actor._ -import akka.annotation.InternalApi -import akka.annotation.InternalStableApi -import akka.event.Logging -import akka.event.LoggingAdapter -import akka.persistence.journal.EventAdapters -import akka.persistence.journal.IdentityEventAdapters +import akka.event.{ Logging, LoggingAdapter } +import akka.persistence.journal.{ EventAdapters, IdentityEventAdapters } import akka.util.Collections.EmptyImmutableSeq import akka.util.Helpers.ConfigOps +import com.typesafe.config.{ Config, ConfigFactory } +import scala.annotation.tailrec +import scala.concurrent.duration._ + import akka.util.Reflect -import akka.util.Timeout -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory +import scala.util.control.NonFatal + +import akka.annotation.InternalApi +import akka.annotation.InternalStableApi /** * Persistence configuration. @@ -287,33 +281,6 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private def verifySnapshotPluginConfigExists(pluginConfig: Config, configPath: String): Unit = verifyPluginConfigExists(pluginConfig.withFallback(system.settings.config), configPath, "Snapshot store") - /** - * Scala API: Initialize the given journal and snapshot plugins. - * - * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. - * @return a `Future` that is completed when the initialization has completed - */ - def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: FiniteDuration): Future[Done] = { - val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}" - val ref = - system.systemActorOf(PersistenceInit.props(journalPluginId, snapshotPluginId, persistenceId), persistenceId) - import akka.pattern.ask - import system.dispatcher - implicit val askTimeout: Timeout = timeout - (ref ? "start").map(_ => Done) - } - - /** - * Java API: Initialize the given journal and snapshot plugins. - * - * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. - * @return a `Future` that is completed when the initialization has completed - */ - def initializePlugin(journalPluginId: String, snapshotPluginId: String, timeout: java.time.Duration): Future[Done] = { - import akka.util.JavaDurationConverters._ - initializePlugin(journalPluginId, snapshotPluginId, timeout.asScala) - } - /** * Returns an [[akka.persistence.journal.EventAdapters]] object which serves as a per-journal collection of bound event adapters. * If no adapters are registered for a given journal the EventAdapters object will simply return the identity