From ee85d23a3e17d82913ae276cf704b57741418330 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 2 Apr 2018 12:59:41 +0200 Subject: [PATCH] use new Typed persistence impl in javadsl, #24753 (#24802) * don't use untyped * snapshot and tagging predicates * onRecoveryCompleted * actually run the java test by adding JUnitSuite --- .../typed/javadsl/PersistentBehavior.scala | 57 +++++++++++++++---- .../typed/javadsl/PersistentActorTest.java | 32 ++++++++--- .../scaladsl/PersistentBehaviorSpec.scala | 5 +- 3 files changed, 72 insertions(+), 22 deletions(-) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index 7325e18003..cbda330565 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -6,16 +6,17 @@ package akka.persistence.typed.javadsl import java.util.Collections -import akka.actor.typed.Behavior.UntypedPropsBehavior -import akka.actor.typed.internal.adapter.PropsAdapter +import akka.actor.typed +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.javadsl.ActorContext -import akka.annotation.{ ApiMayChange, InternalApi } +import akka.annotation.ApiMayChange import akka.persistence.typed._ import akka.persistence.typed.internal._ /** Java API */ @ApiMayChange -abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends UntypedPropsBehavior[Command] { +abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends DeferredBehavior[Command] { /** * Factory of effects. @@ -77,29 +78,61 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence */ def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {} + /** + * Override and define that snapshot should be saved every N events. + * + * If this is overridden `shouldSnapshot` is not used. + * + * @return number of events between snapshots, should be greater than 0 + * @see [[PersistentBehavior#shouldSnapshot]] + */ + def snapshotEvery(): Long = 0L + /** * Initiates a snapshot if the given function returns true. * When persisting multiple events at once the snapshot is triggered after all the events have * been persisted. * - * `predicate` receives the State, Event and the sequenceNr used for the Event + * receives the State, Event and the sequenceNr used for the Event + * + * @return `true` if snapshot should be saved for the given event + * @see [[PersistentBehavior#snapshotEvery]] */ def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false + /** * The `tagger` function should give event tags, which will be used in persistence query */ def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet() - /** INTERNAL API */ - @InternalApi private[akka] override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = { - val behaviorImpl = scaladsl.PersistentBehaviors.receive[Command, Event, State]( + /** + * INTERNAL API: DeferredBehavior init + */ + override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { + + val snapshotWhen: (State, Event, Long) ⇒ Boolean = { (state, event, seqNr) ⇒ + val n = snapshotEvery() + if (n > 0) + seqNr % n == 0 + else + shouldSnapshot(state, event, seqNr) + } + + val tagger: Event ⇒ Set[String] = { event ⇒ + import scala.collection.JavaConverters._ + val tags = tagsFor(event) + if (tags.isEmpty) Set.empty + else tags.asScala.toSet + } + + scaladsl.PersistentBehaviors.receive[Command, Event, State]( persistenceId, initialState, (c, state, cmd) ⇒ commandHandler()(c.asJava, state, cmd).asInstanceOf[EffectImpl[Event, State]], - eventHandler()(_, _) - ) - - PropsAdapter(() ⇒ behaviorImpl, props) + eventHandler()(_, _)) + .onRecoveryCompleted((ctx, state) ⇒ onRecoveryCompleted(ctx.asJava, state)) + .snapshotWhen(snapshotWhen) + .withTagger(tagger) } } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java index ac851c1473..039313fafb 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java @@ -5,25 +5,27 @@ package akka.persistence.typed.javadsl; import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.japi.function.Function3; -import akka.persistence.typed.scaladsl.PersistentBehaviorSpec$; +import akka.persistence.typed.scaladsl.PersistentBehaviorSpec; import akka.testkit.typed.javadsl.TestKitJunitResource; import akka.testkit.typed.javadsl.TestProbe; import org.junit.ClassRule; import org.junit.Test; +import org.scalatest.junit.JUnitSuite; import java.time.Duration; import java.util.*; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertEquals; -public class PersistentActorTest { +public class PersistentActorTest extends JUnitSuite { @ClassRule - public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentBehaviorSpec$.MODULE$.config()); + public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentBehaviorSpec.conf()); static final Incremented timeoutEvent = new Incremented(100); static final State emptyState = new State(0, Collections.emptyList()); @@ -256,7 +258,7 @@ public class PersistentActorTest { @Test public void persistEvents() { - ActorRef c = testKit.spawn(counter("c2")); + ActorRef c = testKit.spawn(counter("c1")); TestProbe probe = testKit.createTestProbe(); c.tell(Increment.instance); c.tell(new GetValue(probe.ref())); @@ -284,7 +286,7 @@ public class PersistentActorTest { @Test public void handleTerminatedSignal() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("c2", eventHandlerProbe.ref())); + ActorRef c = testKit.spawn(counter("c3", eventHandlerProbe.ref())); c.tell(Increment.instance); c.tell(new IncrementLater()); eventHandlerProbe.expectMessage(Pair.create(emptyState, new Incremented(1))); @@ -294,7 +296,7 @@ public class PersistentActorTest { @Test public void handleReceiveTimeout() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("c1", eventHandlerProbe.ref())); + ActorRef c = testKit.spawn(counter("c4", eventHandlerProbe.ref())); c.tell(new Increment100OnTimeout()); eventHandlerProbe.expectMessage(Pair.create(emptyState, timeoutEvent)); } @@ -303,11 +305,25 @@ public class PersistentActorTest { public void chainableSideEffectsWithEvents() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); TestProbe loggingProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("c1", eventHandlerProbe.ref(), loggingProbe.ref())); + ActorRef c = testKit.spawn(counter("c5", eventHandlerProbe.ref(), loggingProbe.ref())); c.tell(new EmptyEventsListAndThenLog()); loggingProbe.expectMessage(loggingOne); } + @Test + public void workWhenWrappedInOtherBehavior() { + Behavior behavior = Behaviors.supervise(counter("c6")).onFailure( + SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1), + Duration.ofSeconds(10), 0.1) + ); + ActorRef c = testKit.spawn(behavior); + + TestProbe probe = testKit.createTestProbe(); + c.tell(Increment.instance); + c.tell(new GetValue(probe.ref())); + probe.expectMessage(new State(1, singletonList(0))); + } + @Test public void snapshot() { PersistentBehavior snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0); diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index a8e4fa54ff..c90150e8df 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -35,7 +35,8 @@ object PersistentBehaviorSpec { def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) = ??? } - val config = ConfigFactory.parseString( + // also used from PersistentActorTest + val conf: Config = ConfigFactory.parseString( s""" akka.loglevel = INFO # akka.persistence.typed.log-stashing = INFO @@ -177,7 +178,7 @@ object PersistentBehaviorSpec { class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually { import PersistentBehaviorSpec._ - override def config: Config = PersistentBehaviorSpec.config + override def config: Config = PersistentBehaviorSpec.conf implicit val testSettings = TestKitSettings(system)