diff --git a/akka-contrib/docs/cluster-sharding.rst b/akka-contrib/docs/cluster-sharding.rst index eaa681830b..6971201426 100644 --- a/akka-contrib/docs/cluster-sharding.rst +++ b/akka-contrib/docs/cluster-sharding.rst @@ -30,9 +30,11 @@ This is how an entry actor may look like: .. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-actor The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state. -It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover +It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover its state if it is valuable. +Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique. + When using the sharding extension you are first, typically at system startup on each node in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` method. ``ClusterSharding.start`` gives you the reference which you can pass along. @@ -74,9 +76,11 @@ This is how an entry actor may look like: .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-actor The above actor uses event sourcing and the support provided in ``PersistentActor`` to store its state. -It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover +It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover its state if it is valuable. +Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique. + When using the sharding extension you are first, typically at system startup on each node in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` method. ``ClusterSharding.start`` gives you the reference which you can pass along. diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index 25e89bf060..cf2403403d 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -77,6 +77,10 @@ object ClusterShardingSpec extends MultiNodeConfig { context.setReceiveTimeout(120.seconds) + // self.path.parent.name is the type name (utf-8 URL-encoded) + // self.path.name is the entry identifier (utf-8 URL-encoded) + override def persistenceId: String = self.path.parent.name + "-" + self.path.name + var count = 0 //#counter-actor diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java index 8427ce2f4d..e42c11cec4 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java @@ -111,6 +111,13 @@ public class ClusterShardingTest { } int count = 0; + + // getSelf().path().parent().name() is the type name (utf-8 URL-encoded) + // getSelf().path().name() is the entry identifier (utf-8 URL-encoded) + @Override + public String persistenceId() { + return getSelf().path().parent().name() + "-" + getSelf().path().name(); + } @Override public void preStart() throws Exception { diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 460b6ba041..1a64c3110a 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -133,6 +133,9 @@ public class PersistenceDocTest { } class MyProcessor5 extends UntypedPersistentActor { + @Override + public String persistenceId() { return "persistence-id"; } + //#recovery-completed @Override @@ -356,6 +359,9 @@ public class PersistenceDocTest { static Object o8 = new Object() { //#reliable-event-delivery class MyPersistentActor extends UntypedPersistentActor { + @Override + public String persistenceId() { return "some-persistence-id"; } + private ActorRef destination; private ActorRef channel; @@ -394,6 +400,8 @@ public class PersistenceDocTest { static Object o9 = new Object() { //#persist-async class MyPersistentActor extends UntypedPersistentActor { + @Override + public String persistenceId() { return "some-persistence-id"; } @Override public void onReceiveRecover(Object msg) { @@ -441,6 +449,8 @@ public class PersistenceDocTest { static Object o10 = new Object() { //#defer class MyPersistentActor extends UntypedPersistentActor { + @Override + public String persistenceId() { return "some-persistence-id"; } @Override public void onReceiveRecover(Object msg) { @@ -486,8 +496,11 @@ public class PersistenceDocTest { static Object o11 = new Object() { //#view class MyView extends UntypedPersistentView { - @Override public String viewId() { return "some-persistence-id-view"; } - @Override public String persistenceId() { return "some-persistence-id"; } + @Override + public String persistenceId() { return "some-persistence-id"; } + + @Override + public String viewId() { return "my-stable-persistence-view-id"; } @Override public void onReceive(Object message) throws Exception { @@ -499,7 +512,6 @@ public class PersistenceDocTest { unhandled(message); } } - } //#view diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 5f52f84726..cca5b9f18b 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -105,7 +105,7 @@ is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is de .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The -``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. +``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``. The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example, @@ -139,20 +139,11 @@ It contains instructions on how to run the ``PersistentActorExample``. Identifiers ----------- -A persistent actor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of persistent actor's path without the address part and can be obtained via the ``persistenceId`` -method. - -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id - -Applications can customize a persistent actor's id by specifying an actor name during persistent actor creation as shown in -section :ref:`event-sourcing-java-lambda`. This changes that persistent actor's name in its actor hierarchy and hence influences only -part of the persistent actor id. To fully customize a persistent actor's id, the ``persistenceId`` method must be overridden. +A persistent actor must have an identifier that doesn't change across different actor incarnations. +The identifier must be defined with the ``persistenceId`` method. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id-override -Overriding ``persistenceId`` is the recommended way to generate stable identifiers. - .. _recovery-java-lambda: Recovery @@ -329,13 +320,8 @@ Further possibilities to customize initial recovery are explained in section :re Identifiers ----------- -A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` -method. - -Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's -name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the -``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. +A persistent view must have an identifier that doesn't change across different actor incarnations. +The identifier must be defined with the ``viewId`` method. The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its persistent actor shall be shared (which is what applications usually do not want). diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 5a9609d3b6..453b761295 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -107,7 +107,7 @@ is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is de .. includecode:: ../../../akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The -``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. +``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``. The persistent actor's ``onReceiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` and ``SnapshotOffer`` messages. The persistent actor's ``onReceiveCommand`` method is a command handler. In this example, @@ -141,19 +141,11 @@ It contains instructions on how to run the ``PersistentActorExample``. Identifiers ----------- -A persistent actor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of persistent actor's path without the address part and can be obtained via the ``persistenceId`` -method. - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id - -Applications can customize a persistent actor's id by specifying an actor name during persistent actor creation as shown in -section :ref:`event-sourcing-java`. This changes that processor's name in its actor hierarchy and hence influences only -part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden. +A persistent actor must have an identifier that doesn't change across different actor incarnations. +The identifier must be defined with the ``persistenceId`` method. .. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id-override -Overriding ``persistenceId`` is the recommended way to generate stable identifiers. .. _recovery-java: @@ -334,13 +326,8 @@ Further possibilities to customize initial recovery are explained in section :re Identifiers ----------- -A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` -method. - -Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's -name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the -``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. +A persistent view must have an identifier that doesn't change across different actor incarnations. +The identifier must be defined with the ``viewId`` method. The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its persistent actor shall be shared (which is what applications usually do not want). diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index bf4f142336..8d701da9a6 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -88,6 +88,8 @@ trait PersistenceDocSpec { } class MyProcessor4 extends PersistentActor { + override def persistenceId = "my-stable-persistence-id" + //#recovery-completed def receiveRecover: Receive = { @@ -310,6 +312,8 @@ trait PersistenceDocSpec { class MyPersistentActor(destination: ActorRef) extends PersistentActor { val channel = context.actorOf(Channel.props("channel")) + override def persistenceId = "my-stable-persistence-id" + def handleEvent(event: String) = { // update state // ... @@ -338,6 +342,8 @@ trait PersistenceDocSpec { //#persist-async class MyPersistentActor extends PersistentActor { + override def persistenceId = "my-stable-persistence-id" + def receiveRecover: Receive = { case _ => // handle recovery here } @@ -372,6 +378,8 @@ trait PersistenceDocSpec { //#defer class MyPersistentActor extends PersistentActor { + override def persistenceId = "my-stable-persistence-id" + def receiveRecover: Receive = { case _ => // handle recovery here } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 73e41fe735..9bc30a957c 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -100,7 +100,7 @@ is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is de .. includecode:: ../../../akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala#persistent-actor-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The -``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. +``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``. The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example, @@ -134,20 +134,11 @@ It contains instructions on how to run the ``PersistentActorExample``. Identifiers ----------- -A persistent actor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of persistent actor's path without the address part and can be obtained via the ``persistenceId`` -method. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id - -Applications can customize a persistent actor's id by specifying an actor name during persistent actor creation as shown in -section :ref:`event-sourcing`. This changes that persistent actor's name in its actor hierarchy and hence influences only -part of the persistent actor id. To fully customize a persistent actor's id, the ``persistenceId`` method must be overridden. +A persistent actor must have an identifier that doesn't change across different actor incarnations. +The identifier must be defined with the ``persistenceId`` method. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id-override -Overriding ``persistenceId`` is the recommended way to generate stable identifiers. - .. _recovery: Recovery @@ -333,13 +324,8 @@ Further possibilities to customize initial recovery are explained in section :re Identifiers ----------- -A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` -method. - -Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's -name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the -``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. +A persistent view must have an identifier that doesn't change across different actor incarnations. +The identifier must be defined with the ``viewId`` method. The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its persistent actor shall be shared (which is what applications usually do not want). diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index e5ecc9aae7..a13df08bfc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -6,7 +6,7 @@ package akka.persistence import java.lang.{ Iterable ⇒ JIterable } -import akka.actor.AbstractActor +import akka.actor.{ AbstractActor, UntypedActor } import akka.japi.{ Procedure, Util } import akka.persistence.JournalProtocol._ @@ -17,7 +17,7 @@ import scala.collection.immutable * * Event sourcing mixin for a [[Processor]]. */ -private[persistence] trait Eventsourced extends Processor { +private[persistence] trait Eventsourced extends ProcessorImpl { // TODO consolidate these traits as PersistentActor #15230 /** @@ -388,23 +388,15 @@ trait EventsourcedProcessor extends Processor with Eventsourced { /** * An persistent Actor - can be used to implement command or event sourcing. */ -// TODO remove EventsourcedProcessor / Processor #15230 -trait PersistentActor extends EventsourcedProcessor +trait PersistentActor extends ProcessorImpl with Eventsourced { + def receive = receiveCommand +} /** * Java API: an persistent actor - can be used to implement command or event sourcing. */ -abstract class UntypedPersistentActor extends UntypedEventsourcedProcessor -/** - * Java API: an persistent actor - can be used to implement command or event sourcing. - */ -abstract class AbstractPersistentActor extends AbstractEventsourcedProcessor +abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl with Eventsourced { -/** - * Java API: an event sourced processor. - */ -@deprecated("UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") -abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced { final def onReceive(message: Any) = onReceiveCommand(message) final def receiveRecover: Receive = { @@ -558,16 +550,10 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events } /** - * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]): - * command handler. Typically validates commands against current state (and/or by - * communication with other actors). On successful validation, one or more events are - * derived from a command and these events are then persisted by calling `persist`. - * Commands sent to event sourced processors must not be [[Persistent]] or - * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is - * thrown by the processor. + * Java API: an persistent actor - can be used to implement command or event sourcing. */ -@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") -abstract class AbstractEventsourcedProcessor extends AbstractActor with EventsourcedProcessor { +abstract class AbstractPersistentActor extends AbstractActor with PersistentActor with Eventsourced { + /** * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. It is guaranteed that no new commands will be received by a processor @@ -676,9 +662,28 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit = persistAsync(Util.immutableSeq(events))(event ⇒ handler(event)) - override def receive = super[EventsourcedProcessor].receive + override def receive = super[PersistentActor].receive - override def receive(receive: Receive): Unit = { - throw new IllegalArgumentException("Define the behavior by overriding receiveRecover and receiveCommand") - } } + +/** + * Java API: an event sourced processor. + */ +@deprecated("UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") +abstract class UntypedEventsourcedProcessor extends UntypedPersistentActor { + override def persistenceId: String = processorId +} + +/** + * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]): + * command handler. Typically validates commands against current state (and/or by + * communication with other actors). On successful validation, one or more events are + * derived from a command and these events are then persisted by calling `persist`. + * Commands sent to event sourced processors must not be [[Persistent]] or + * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is + * thrown by the processor. + */ +@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") +abstract class AbstractEventsourcedProcessor extends AbstractPersistentActor { + override def persistenceId: String = processorId +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index ec69966917..030539aa1a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -52,8 +52,17 @@ import akka.dispatch._ * @see [[PersistentBatch]] */ @deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") -trait Processor extends Actor with Recovery { - // todo remove Processor in favor of PersistentActor #15230 +trait Processor extends ProcessorImpl { + /** + * Persistence id. Defaults to this persistent-actors's path and can be overridden. + */ + override def persistenceId: String = processorId +} + +/** INTERNAL API */ +@deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") +private[akka] trait ProcessorImpl extends Actor with Recovery { + // TODO: remove Processor in favor of PersistentActor #15230 import JournalProtocol._ @@ -168,11 +177,6 @@ trait Processor extends Actor with Recovery { @deprecated("Override `persistenceId: String` instead. Processor will be removed.", since = "2.3.4") override def processorId: String = _persistenceId // TODO: remove processorId - /** - * Persistence id. Defaults to this persistent-actors's path and can be overridden. - */ - override def persistenceId: String = processorId - /** * Returns `persistenceId`. */ diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index be5f3744b1..4486a7fadc 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -138,6 +138,10 @@ public class LambdaPersistenceDocTest { //#recovery-completed class MyPersistentActor5 extends AbstractPersistentActor { + @Override public String persistenceId() { + return "my-stable-persistence-id"; + } + @Override public PartialFunction receiveRecover() { return ReceiveBuilder. match(String.class, this::handleEvent).build(); @@ -374,6 +378,10 @@ public class LambdaPersistenceDocTest { this.channel = context().actorOf(Channel.props(), "channel"); } + @Override public String persistenceId() { + return "my-stable-persistence-id"; + } + private void handleEvent(String event) { // update state // ... @@ -401,6 +409,10 @@ public class LambdaPersistenceDocTest { //#persist-async class MyPersistentActor extends AbstractPersistentActor { + @Override public String persistenceId() { + return "my-stable-persistence-id"; + } + private void handleCommand(String c) { sender().tell(c, self()); @@ -447,6 +459,10 @@ public class LambdaPersistenceDocTest { //#defer class MyPersistentActor extends AbstractPersistentActor { + @Override public String persistenceId() { + return "my-stable-persistence-id"; + } + private void handleCommand(String c) { persistAsync(String.format("evt-%s-1", c), e -> { sender().tell(e, self()); diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java index 7e4d75ab00..3313274aa8 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java @@ -74,12 +74,16 @@ class ExampleState implements Serializable { } class ExamplePersistentActor extends AbstractPersistentActor { + private ExampleState state = new ExampleState(); public int getNumEvents() { return state.size(); } + @Override + public String persistenceId() { return "sample-id-1"; } + @Override public PartialFunction receiveRecover() { return ReceiveBuilder. diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorFailureExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorFailureExample.java index 3c56040b44..7d87d97363 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorFailureExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorFailureExample.java @@ -18,6 +18,9 @@ public class PersistentActorFailureExample { public static class ExamplePersistentActor extends AbstractPersistentActor { private ArrayList received = new ArrayList(); + @Override + public String persistenceId() { return "sample-id-2"; } + @Override public PartialFunction receiveCommand() { return ReceiveBuilder. diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java deleted file mode 100644 index bbca9bd5d8..0000000000 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package sample.persistence; - -import akka.actor.AbstractActor; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.japi.pf.ReceiveBuilder; -import akka.persistence.*; -import scala.PartialFunction; -import scala.runtime.BoxedUnit; - -public class ProcessorChannelExample { - public static class ExamplePersistentActor extends AbstractProcessor { - private ActorRef destination; - private ActorRef channel; - - public ExamplePersistentActor(ActorRef destination) { - this.destination = destination; - this.channel = context().actorOf(Channel.props(), "channel"); - - receive(ReceiveBuilder. - match(Persistent.class, p -> { - System.out.println("processed " + p.payload()); - channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self()); - }). - match(String.class, s -> System.out.println("reply = " + s)).build() - ); - } - } - - public static class ExampleDestination extends AbstractActor { - public ExampleDestination() { - receive(ReceiveBuilder. - match(ConfirmablePersistent.class, cp -> { - System.out.println("received " + cp.payload()); - sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null); - cp.confirm(); - }).build() - ); - } - } - - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); - final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class)); - final ActorRef processor = system.actorOf(Props.create(ExamplePersistentActor.class, destination), "processor-1"); - - processor.tell(Persistent.create("a"), null); - processor.tell(Persistent.create("b"), null); - - Thread.sleep(1000); - system.shutdown(); - } -} diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java index 903ef982cd..212deee006 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java @@ -62,6 +62,9 @@ public class SnapshotExample { build(); } + @Override + public String persistenceId() { return "sample-id-3"; } + @Override public PartialFunction receiveRecover() { return ReceiveBuilder. diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java index 7b3659cf99..e6d973c369 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java @@ -21,9 +21,7 @@ public class ViewExample { private int count = 1; @Override - public String persistenceId() { - return "persistentActor-5"; - } + public String persistenceId() { return "sample-id-4"; } @Override public PartialFunction receiveCommand() { @@ -49,10 +47,8 @@ public class ViewExample { private int numReplicated = 0; - @Override public String persistenceId() { return "persistentActor-5"; } - @Override public String viewId() { - return "view-5"; - } + @Override public String persistenceId() { return "sample-id-4"; } + @Override public String viewId() { return "sample-view-id-4"; } public ExampleView() { receive(ReceiveBuilder. diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java index 38247d2d26..528e92500e 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java @@ -66,6 +66,9 @@ class ExampleState implements Serializable { } class ExamplePersistentActor extends UntypedPersistentActor { + @Override + public String persistenceId() { return "sample-id-1"; } + private ExampleState state = new ExampleState(); public int getNumEvents() { diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorFailureExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorFailureExample.java index 277a7b3806..0db0bbc4fa 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorFailureExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorFailureExample.java @@ -1,12 +1,18 @@ package sample.persistence; -import java.util.ArrayList; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import akka.japi.Procedure; -import akka.actor.*; -import akka.persistence.*; +import akka.persistence.UntypedPersistentActor; + +import java.util.ArrayList; public class PersistentActorFailureExample { public static class ExamplePersistentActor extends UntypedPersistentActor { + @Override + public String persistenceId() { return "sample-id-2"; } + private ArrayList received = new ArrayList(); @Override @@ -35,7 +41,6 @@ public class PersistentActorFailureExample { unhandled(message); } } - } public static void main(String... args) throws Exception { diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java index dc2c713581..b84ad675d5 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentViewExample.java @@ -13,13 +13,11 @@ import java.util.concurrent.TimeUnit; public class PersistentViewExample { public static class ExamplePersistentActor extends UntypedPersistentActor { - private int count = 1; - @Override - public String persistenceId() { - return "persistentActor-5"; - } - + public String persistenceId() { return "sample-id-4"; } + + private int count = 1; + @Override public void onReceiveRecover(Object message) { if (message instanceof String) { @@ -49,8 +47,8 @@ public class PersistentViewExample { private int numReplicated = 0; - @Override public String persistenceId() { return "persistentActor-5"; } - @Override public String viewId() { return "view-5"; } + @Override public String persistenceId() { return "sample-id-4"; } + @Override public String viewId() { return "sample-view-id-4"; } @Override public void onReceive(Object message) throws Exception { diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java index 8407685981..d3734a7507 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java @@ -1,12 +1,17 @@ package sample.persistence; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.Procedure; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; +import akka.persistence.UntypedPersistentActor; + import java.io.Serializable; import java.util.ArrayList; -import akka.actor.*; -import akka.persistence.*; -import akka.japi.Procedure; - public class SnapshotExample { public static class ExampleState implements Serializable { private final ArrayList received; @@ -34,6 +39,9 @@ public class SnapshotExample { } public static class ExamplePersistentActor extends UntypedPersistentActor { + @Override + public String persistenceId() { return "sample-id-3"; } + private ExampleState state = new ExampleState(); @Override @@ -72,6 +80,7 @@ public class SnapshotExample { unhandled(message); } } + } public static void main(String... args) throws Exception { diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala index 0165e5bc22..bf54a32da1 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala @@ -4,20 +4,22 @@ package sample.persistence import akka.actor._ import akka.persistence._ -final case class Cmd(data: String) -final case class Evt(data: String) +case class Cmd(data: String) +case class Evt(data: String) -final case class ExampleState(events: List[String] = Nil) { - def update(evt: Evt) = copy(evt.data :: events) - def size = events.length +case class ExampleState(events: List[String] = Nil) { + def updated(evt: Evt): ExampleState = copy(evt.data :: events) + def size: Int = events.length override def toString: String = events.reverse.toString } -class ExampleProcessor extends PersistentActor { +class ExamplePersistentActor extends PersistentActor { + override def persistenceId = "sample-id-1" + var state = ExampleState() def updateState(event: Evt): Unit = - state = state.update(event) + state = state.updated(event) def numEvents = state.size @@ -44,14 +46,14 @@ class ExampleProcessor extends PersistentActor { object PersistentActorExample extends App { val system = ActorSystem("example") - val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala") + val persistentActor = system.actorOf(Props[ExamplePersistentActor], "persistentActor-4-scala") - processor ! Cmd("foo") - processor ! Cmd("baz") - processor ! Cmd("bar") - processor ! "snap" - processor ! Cmd("buzz") - processor ! "print" + persistentActor ! Cmd("foo") + persistentActor ! Cmd("baz") + persistentActor ! Cmd("bar") + persistentActor ! "snap" + persistentActor ! Cmd("buzz") + persistentActor ! "print" Thread.sleep(1000) system.shutdown() diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorFailureExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorFailureExample.scala index 6341d3d9a2..2c9469059e 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorFailureExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorFailureExample.scala @@ -5,6 +5,8 @@ import akka.persistence._ object PersistentActorFailureExample extends App { class ExamplePersistentActor extends PersistentActor { + override def persistenceId = "sample-id-2" + var received: List[String] = Nil // state def receiveCommand: Actor.Receive = { diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala index 48605e1558..492711ffb9 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala @@ -10,6 +10,8 @@ object SnapshotExample extends App { } class ExamplePersistentActor extends PersistentActor { + def persistenceId: String = "sample-id-3" + var state = ExampleState() def receiveCommand: Actor.Receive = { @@ -28,6 +30,7 @@ object SnapshotExample extends App { case evt: String => state = state.updated(evt) } + } val system = ActorSystem("example") diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala index dbfaa88f93..bf6ad89078 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala @@ -7,7 +7,7 @@ import akka.persistence._ object ViewExample extends App { class ExamplePersistentActor extends PersistentActor { - override def persistenceId = "persistentActor-5" + override def persistenceId = "sample-id-4" var count = 1 @@ -27,8 +27,8 @@ object ViewExample extends App { class ExampleView extends PersistentView { private var numReplicated = 0 - override def persistenceId: String = "persistentActor-5" - override def viewId = "view-5" + override def persistenceId: String = "sample-id-4" + override def viewId = "sample-view-id-4" def receive = { case "snap" =>