diff --git a/akka-docs/src/main/paradox/typed/persistence-testing.md b/akka-docs/src/main/paradox/typed/persistence-testing.md index 8261e92d3f..f7aa50ee5a 100644 --- a/akka-docs/src/main/paradox/typed/persistence-testing.md +++ b/akka-docs/src/main/paradox/typed/persistence-testing.md @@ -188,4 +188,25 @@ For tests that involve more than one Cluster node you have to use another journa While it's possible to use the @ref:[Persistence Plugin Proxy](../persistence-plugins.md#persistence-plugin-proxy) it's often better and more realistic to use a real database. -See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128). +See [akka-samples issue #128](https://github.com/akka/akka-samples/issues/128). + +### Plugin initialization + +Some Persistence plugins create tables automatically, but has the limitation that it can't be done concurrently +from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize +the plugins at the same time. To coordinate initialization you can use the `PersistenceInit` utility. + +`PersistenceInit` is part of `akka-persistence-testkit` and you need to add the dependency to your project: + +@@dependency[sbt,Maven,Gradle] { + group="com.typesafe.akka" + artifact="akka-persistence-testkit_$scala.binary_version$" + version="$akka.version$" +} + +Scala +: @@snip [PersistenceInitSpec.scala](/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala) { #imports #init } + +Java +: @@snip [PersistenceInitTest.java](/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java) { #imports #init } + diff --git a/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java new file mode 100644 index 0000000000..710cb0a35e --- /dev/null +++ b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceInitTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.persistence.testkit; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; + +import com.typesafe.config.ConfigFactory; +import jdocs.AbstractJavaTest; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.UUID; + +// #imports +import akka.persistence.testkit.javadsl.PersistenceInit; +import akka.Done; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +// #imports + +public class PersistenceInitTest extends AbstractJavaTest { + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource( + ConfigFactory.parseString( + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n" + + "akka.persistence.journal.inmem.test-serialization = on \n" + + "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n" + + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-" + + UUID.randomUUID().toString() + + "\" \n") + .withFallback(ConfigFactory.defaultApplication())); + + @Test + public void testInit() throws Exception { + // #init + Duration timeout = Duration.ofSeconds(5); + CompletionStage done = + PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout); + done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS); + // #init + } +} diff --git a/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala b/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala new file mode 100644 index 0000000000..24227b270e --- /dev/null +++ b/akka-docs/src/test/scala/docs/persistence/testkit/PersistenceInitSpec.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.persistence.testkit + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import org.scalatest.wordspec.AnyWordSpecLike + +//#imports +import akka.persistence.testkit.scaladsl.PersistenceInit + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ + +//#imports + +class PersistenceInitSpec + extends ScalaTestWithActorTestKit( + """ + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}" + """) + with AnyWordSpecLike { + + "PersistenceInit" should { + "initialize plugins" in { + //#init + val timeout = 5.seconds + val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout) + Await.result(done, timeout) + //#init + } + } +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala new file mode 100644 index 0000000000..bde769571d --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/PersistenceInitImpl.scala @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.internal + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorLogging +import akka.actor.Props +import akka.annotation.InternalApi +import akka.persistence.PersistentActor +import akka.persistence.RecoveryCompleted + +/** + * INTERNAL API + */ +@InternalApi private[akka] object PersistenceInitImpl { + + def props(journalPluginId: String, snapshotPluginId: String, persistenceId: String): Props = { + Props(new PersistenceInitImpl(journalPluginId, snapshotPluginId, persistenceId)) + } +} + +/** + * INTERNAL API: Initialize a journal and snapshot plugin by starting this `PersistentActor` + * and send any message to it. It will reply to the `sender()` with the same message when + * recovery has completed. + */ +@InternalApi private[akka] class PersistenceInitImpl( + override val journalPluginId: String, + override val snapshotPluginId: String, + override val persistenceId: String) + extends PersistentActor + with ActorLogging { + + private val startTime = System.nanoTime() + + def receiveRecover: Receive = { + case RecoveryCompleted => + log.debug( + "Initialization completed for journal [{}] and snapshot [{}] plugins, with persistenceId [{}], in [{} ms]", + journalPluginId, + snapshotPluginId, + persistenceId, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) + case _ => + } + + def receiveCommand: Receive = { + case msg => + // recovery has completed + sender() ! msg + context.stop(self) + } +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala new file mode 100644 index 0000000000..7f1c62dc7a --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceInit.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.javadsl + +import java.time.Duration +import java.util.concurrent.CompletionStage + +import scala.compat.java8.FutureConverters._ + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.persistence.testkit.scaladsl +import akka.util.JavaDurationConverters._ + +/** + * Test utility to initialize persistence plugins. Useful when initialization order or coordination + * is needed. For example to avoid creating tables concurrently. + */ +object PersistenceInit { + + /** + * Initialize the default journal and snapshot plugins. + * + * @return a `CompletionStage` that is completed when the initialization has completed + */ + def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: Duration): CompletionStage[Done] = + initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout) + + /** + * Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * + * @return a `CompletionStage` that is completed when the initialization has completed + */ + def initializePlugins( + system: ClassicActorSystemProvider, + journalPluginId: String, + snapshotPluginId: String, + timeout: Duration): CompletionStage[Done] = + scaladsl.PersistenceInit.initializePlugins(system, journalPluginId, snapshotPluginId, timeout.asScala).toJava + +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala new file mode 100644 index 0000000000..2e61a5135f --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceInit.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.scaladsl + +import java.util.UUID + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +import akka.Done +import akka.actor.ClassicActorSystemProvider +import akka.actor.ExtendedActorSystem +import akka.persistence.testkit.internal.PersistenceInitImpl +import akka.util.Timeout + +/** + * Test utility to initialize persistence plugins. Useful when initialization order or coordination + * is needed. For example to avoid creating tables concurrently. + */ +object PersistenceInit { + + /** + * Initialize the default journal and snapshot plugins. + * + * @return a `Future` that is completed when the initialization has completed + */ + def initializeDefaultPlugins(system: ClassicActorSystemProvider, timeout: FiniteDuration): Future[Done] = + initializePlugins(system, journalPluginId = "", snapshotPluginId = "", timeout) + + /** + * Initialize the given journal and snapshot plugins. + * + * The `snapshotPluginId` can be empty (`""`) if snapshot plugin isn't used. + * + * @return a `Future` that is completed when the initialization has completed + */ + def initializePlugins( + system: ClassicActorSystemProvider, + journalPluginId: String, + snapshotPluginId: String, + timeout: FiniteDuration): Future[Done] = { + val persistenceId: String = s"persistenceInit-${UUID.randomUUID()}" + val extSystem = system.classicSystem.asInstanceOf[ExtendedActorSystem] + val ref = + extSystem.systemActorOf( + PersistenceInitImpl.props(journalPluginId, snapshotPluginId, persistenceId), + persistenceId) + import akka.pattern.ask + import extSystem.dispatcher + implicit val askTimeout: Timeout = timeout + (ref ? "start").map(_ => Done) + } +}