!per #15436 make persistenceId abstract in NEW classes

(cherry picked from commit de3249f7f4b859c3caa232e579d9a3bae7406803)

Conflicts:
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala
This commit is contained in:
Konrad 'ktoso' Malawski 2014-06-26 13:56:01 +02:00 committed by Patrik Nordwall
parent 43952af7ea
commit b1d1d87111
24 changed files with 182 additions and 193 deletions

View file

@ -30,9 +30,11 @@ This is how an entry actor may look like:
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-actor .. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-actor
The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state. The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state.
It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover
its state if it is valuable. its state if it is valuable.
Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node When using the sharding extension you are first, typically at system startup on each node
in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start``
method. ``ClusterSharding.start`` gives you the reference which you can pass along. method. ``ClusterSharding.start`` gives you the reference which you can pass along.
@ -74,9 +76,11 @@ This is how an entry actor may look like:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-actor .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-actor
The above actor uses event sourcing and the support provided in ``PersistentActor`` to store its state. The above actor uses event sourcing and the support provided in ``PersistentActor`` to store its state.
It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover
its state if it is valuable. its state if it is valuable.
Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node When using the sharding extension you are first, typically at system startup on each node
in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start``
method. ``ClusterSharding.start`` gives you the reference which you can pass along. method. ``ClusterSharding.start`` gives you the reference which you can pass along.

View file

@ -77,6 +77,10 @@ object ClusterShardingSpec extends MultiNodeConfig {
context.setReceiveTimeout(120.seconds) context.setReceiveTimeout(120.seconds)
// self.path.parent.name is the type name (utf-8 URL-encoded)
// self.path.name is the entry identifier (utf-8 URL-encoded)
override def persistenceId: String = self.path.parent.name + "-" + self.path.name
var count = 0 var count = 0
//#counter-actor //#counter-actor

View file

@ -112,6 +112,13 @@ public class ClusterShardingTest {
int count = 0; int count = 0;
// getSelf().path().parent().name() is the type name (utf-8 URL-encoded)
// getSelf().path().name() is the entry identifier (utf-8 URL-encoded)
@Override
public String persistenceId() {
return getSelf().path().parent().name() + "-" + getSelf().path().name();
}
@Override @Override
public void preStart() throws Exception { public void preStart() throws Exception {
super.preStart(); super.preStart();

View file

@ -133,6 +133,9 @@ public class PersistenceDocTest {
} }
class MyProcessor5 extends UntypedPersistentActor { class MyProcessor5 extends UntypedPersistentActor {
@Override
public String persistenceId() { return "persistence-id"; }
//#recovery-completed //#recovery-completed
@Override @Override
@ -356,6 +359,9 @@ public class PersistenceDocTest {
static Object o8 = new Object() { static Object o8 = new Object() {
//#reliable-event-delivery //#reliable-event-delivery
class MyPersistentActor extends UntypedPersistentActor { class MyPersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "some-persistence-id"; }
private ActorRef destination; private ActorRef destination;
private ActorRef channel; private ActorRef channel;
@ -394,6 +400,8 @@ public class PersistenceDocTest {
static Object o9 = new Object() { static Object o9 = new Object() {
//#persist-async //#persist-async
class MyPersistentActor extends UntypedPersistentActor { class MyPersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "some-persistence-id"; }
@Override @Override
public void onReceiveRecover(Object msg) { public void onReceiveRecover(Object msg) {
@ -441,6 +449,8 @@ public class PersistenceDocTest {
static Object o10 = new Object() { static Object o10 = new Object() {
//#defer //#defer
class MyPersistentActor extends UntypedPersistentActor { class MyPersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "some-persistence-id"; }
@Override @Override
public void onReceiveRecover(Object msg) { public void onReceiveRecover(Object msg) {
@ -486,8 +496,11 @@ public class PersistenceDocTest {
static Object o11 = new Object() { static Object o11 = new Object() {
//#view //#view
class MyView extends UntypedPersistentView { class MyView extends UntypedPersistentView {
@Override public String viewId() { return "some-persistence-id-view"; } @Override
@Override public String persistenceId() { return "some-persistence-id"; } public String persistenceId() { return "some-persistence-id"; }
@Override
public String viewId() { return "my-stable-persistence-view-id"; }
@Override @Override
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {
@ -499,7 +512,6 @@ public class PersistenceDocTest {
unhandled(message); unhandled(message);
} }
} }
} }
//#view //#view

View file

@ -105,7 +105,7 @@ is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is de
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. ``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``.
The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example, and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example,
@ -139,20 +139,11 @@ It contains instructions on how to run the ``PersistentActorExample``.
Identifiers Identifiers
----------- -----------
A persistent actor must have an identifier that doesn't change across different actor incarnations. It defaults to the A persistent actor must have an identifier that doesn't change across different actor incarnations.
``String`` representation of persistent actor's path without the address part and can be obtained via the ``persistenceId`` The identifier must be defined with the ``persistenceId`` method.
method.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id
Applications can customize a persistent actor's id by specifying an actor name during persistent actor creation as shown in
section :ref:`event-sourcing-java-lambda`. This changes that persistent actor's name in its actor hierarchy and hence influences only
part of the persistent actor id. To fully customize a persistent actor's id, the ``persistenceId`` method must be overridden.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id-override .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id-override
Overriding ``persistenceId`` is the recommended way to generate stable identifiers.
.. _recovery-java-lambda: .. _recovery-java-lambda:
Recovery Recovery
@ -329,13 +320,8 @@ Further possibilities to customize initial recovery are explained in section :re
Identifiers Identifiers
----------- -----------
A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the A persistent view must have an identifier that doesn't change across different actor incarnations.
``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` The identifier must be defined with the ``viewId`` method.
method.
Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's
name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the
``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java-lambda` of a view and its
persistent actor shall be shared (which is what applications usually do not want). persistent actor shall be shared (which is what applications usually do not want).

View file

@ -107,7 +107,7 @@ is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is de
.. includecode:: ../../../akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example .. includecode:: ../../../akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. ``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``.
The persistent actor's ``onReceiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` The persistent actor's ``onReceiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The persistent actor's ``onReceiveCommand`` method is a command handler. In this example, and ``SnapshotOffer`` messages. The persistent actor's ``onReceiveCommand`` method is a command handler. In this example,
@ -141,19 +141,11 @@ It contains instructions on how to run the ``PersistentActorExample``.
Identifiers Identifiers
----------- -----------
A persistent actor must have an identifier that doesn't change across different actor incarnations. It defaults to the A persistent actor must have an identifier that doesn't change across different actor incarnations.
``String`` representation of persistent actor's path without the address part and can be obtained via the ``persistenceId`` The identifier must be defined with the ``persistenceId`` method.
method.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id
Applications can customize a persistent actor's id by specifying an actor name during persistent actor creation as shown in
section :ref:`event-sourcing-java`. This changes that processor's name in its actor hierarchy and hence influences only
part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id-override .. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id-override
Overriding ``persistenceId`` is the recommended way to generate stable identifiers.
.. _recovery-java: .. _recovery-java:
@ -334,13 +326,8 @@ Further possibilities to customize initial recovery are explained in section :re
Identifiers Identifiers
----------- -----------
A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the A persistent view must have an identifier that doesn't change across different actor incarnations.
``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` The identifier must be defined with the ``viewId`` method.
method.
Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's
name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the
``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its
persistent actor shall be shared (which is what applications usually do not want). persistent actor shall be shared (which is what applications usually do not want).

View file

@ -88,6 +88,8 @@ trait PersistenceDocSpec {
} }
class MyProcessor4 extends PersistentActor { class MyProcessor4 extends PersistentActor {
override def persistenceId = "my-stable-persistence-id"
//#recovery-completed //#recovery-completed
def receiveRecover: Receive = { def receiveRecover: Receive = {
@ -310,6 +312,8 @@ trait PersistenceDocSpec {
class MyPersistentActor(destination: ActorRef) extends PersistentActor { class MyPersistentActor(destination: ActorRef) extends PersistentActor {
val channel = context.actorOf(Channel.props("channel")) val channel = context.actorOf(Channel.props("channel"))
override def persistenceId = "my-stable-persistence-id"
def handleEvent(event: String) = { def handleEvent(event: String) = {
// update state // update state
// ... // ...
@ -338,6 +342,8 @@ trait PersistenceDocSpec {
//#persist-async //#persist-async
class MyPersistentActor extends PersistentActor { class MyPersistentActor extends PersistentActor {
override def persistenceId = "my-stable-persistence-id"
def receiveRecover: Receive = { def receiveRecover: Receive = {
case _ => // handle recovery here case _ => // handle recovery here
} }
@ -372,6 +378,8 @@ trait PersistenceDocSpec {
//#defer //#defer
class MyPersistentActor extends PersistentActor { class MyPersistentActor extends PersistentActor {
override def persistenceId = "my-stable-persistence-id"
def receiveRecover: Receive = { def receiveRecover: Receive = {
case _ => // handle recovery here case _ => // handle recovery here
} }

View file

@ -100,7 +100,7 @@ is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is de
.. includecode:: ../../../akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala#persistent-actor-example .. includecode:: ../../../akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala#persistent-actor-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. ``state`` of the ``ExamplePersistentActor`` is a list of persisted event data contained in ``ExampleState``.
The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` The persistent actor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example, and ``SnapshotOffer`` messages. The persistent actor's ``receiveCommand`` method is a command handler. In this example,
@ -134,20 +134,11 @@ It contains instructions on how to run the ``PersistentActorExample``.
Identifiers Identifiers
----------- -----------
A persistent actor must have an identifier that doesn't change across different actor incarnations. It defaults to the A persistent actor must have an identifier that doesn't change across different actor incarnations.
``String`` representation of persistent actor's path without the address part and can be obtained via the ``persistenceId`` The identifier must be defined with the ``persistenceId`` method.
method.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id
Applications can customize a persistent actor's id by specifying an actor name during persistent actor creation as shown in
section :ref:`event-sourcing`. This changes that persistent actor's name in its actor hierarchy and hence influences only
part of the persistent actor id. To fully customize a persistent actor's id, the ``persistenceId`` method must be overridden.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id-override .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id-override
Overriding ``persistenceId`` is the recommended way to generate stable identifiers.
.. _recovery: .. _recovery:
Recovery Recovery
@ -333,13 +324,8 @@ Further possibilities to customize initial recovery are explained in section :re
Identifiers Identifiers
----------- -----------
A persistent view must have an identifier that doesn't change across different actor incarnations. It defaults to the A persistent view must have an identifier that doesn't change across different actor incarnations.
``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` The identifier must be defined with the ``viewId`` method.
method.
Applications can customize a view's id by specifying an actor name during view creation. This changes that persistent view's
name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the
``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers.
The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its
persistent actor shall be shared (which is what applications usually do not want). persistent actor shall be shared (which is what applications usually do not want).

View file

@ -6,7 +6,7 @@ package akka.persistence
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import akka.actor.AbstractActor import akka.actor.{ AbstractActor, UntypedActor }
import akka.japi.{ Procedure, Util } import akka.japi.{ Procedure, Util }
import akka.persistence.JournalProtocol._ import akka.persistence.JournalProtocol._
@ -17,7 +17,7 @@ import scala.collection.immutable
* *
* Event sourcing mixin for a [[Processor]]. * Event sourcing mixin for a [[Processor]].
*/ */
private[persistence] trait Eventsourced extends Processor { private[persistence] trait Eventsourced extends ProcessorImpl {
// TODO consolidate these traits as PersistentActor #15230 // TODO consolidate these traits as PersistentActor #15230
/** /**
@ -388,23 +388,15 @@ trait EventsourcedProcessor extends Processor with Eventsourced {
/** /**
* An persistent Actor - can be used to implement command or event sourcing. * An persistent Actor - can be used to implement command or event sourcing.
*/ */
// TODO remove EventsourcedProcessor / Processor #15230 trait PersistentActor extends ProcessorImpl with Eventsourced {
trait PersistentActor extends EventsourcedProcessor def receive = receiveCommand
}
/** /**
* Java API: an persistent actor - can be used to implement command or event sourcing. * Java API: an persistent actor - can be used to implement command or event sourcing.
*/ */
abstract class UntypedPersistentActor extends UntypedEventsourcedProcessor abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl with Eventsourced {
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class AbstractPersistentActor extends AbstractEventsourcedProcessor
/**
* Java API: an event sourced processor.
*/
@deprecated("UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced {
final def onReceive(message: Any) = onReceiveCommand(message) final def onReceive(message: Any) = onReceiveCommand(message)
final def receiveRecover: Receive = { final def receiveRecover: Receive = {
@ -558,16 +550,10 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
} }
/** /**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]): * Java API: an persistent actor - can be used to implement command or event sourcing.
* command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/ */
@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") abstract class AbstractPersistentActor extends AbstractActor with PersistentActor with Eventsourced {
abstract class AbstractEventsourcedProcessor extends AbstractActor with EventsourcedProcessor {
/** /**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor * persisted event. It is guaranteed that no new commands will be received by a processor
@ -676,9 +662,28 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou
final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit = final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAsync(Util.immutableSeq(events))(event handler(event)) persistAsync(Util.immutableSeq(events))(event handler(event))
override def receive = super[EventsourcedProcessor].receive override def receive = super[PersistentActor].receive
override def receive(receive: Receive): Unit = { }
throw new IllegalArgumentException("Define the behavior by overriding receiveRecover and receiveCommand")
} /**
* Java API: an event sourced processor.
*/
@deprecated("UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
abstract class UntypedEventsourcedProcessor extends UntypedPersistentActor {
override def persistenceId: String = processorId
}
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]):
* command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
abstract class AbstractEventsourcedProcessor extends AbstractPersistentActor {
override def persistenceId: String = processorId
} }

View file

@ -52,8 +52,17 @@ import akka.dispatch._
* @see [[PersistentBatch]] * @see [[PersistentBatch]]
*/ */
@deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") @deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4")
trait Processor extends Actor with Recovery { trait Processor extends ProcessorImpl {
// todo remove Processor in favor of PersistentActor #15230 /**
* Persistence id. Defaults to this persistent-actors's path and can be overridden.
*/
override def persistenceId: String = processorId
}
/** INTERNAL API */
@deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4")
private[akka] trait ProcessorImpl extends Actor with Recovery {
// TODO: remove Processor in favor of PersistentActor #15230
import JournalProtocol._ import JournalProtocol._
@ -168,11 +177,6 @@ trait Processor extends Actor with Recovery {
@deprecated("Override `persistenceId: String` instead. Processor will be removed.", since = "2.3.4") @deprecated("Override `persistenceId: String` instead. Processor will be removed.", since = "2.3.4")
override def processorId: String = _persistenceId // TODO: remove processorId override def processorId: String = _persistenceId // TODO: remove processorId
/**
* Persistence id. Defaults to this persistent-actors's path and can be overridden.
*/
override def persistenceId: String = processorId
/** /**
* Returns `persistenceId`. * Returns `persistenceId`.
*/ */

View file

@ -138,6 +138,10 @@ public class LambdaPersistenceDocTest {
//#recovery-completed //#recovery-completed
class MyPersistentActor5 extends AbstractPersistentActor { class MyPersistentActor5 extends AbstractPersistentActor {
@Override public String persistenceId() {
return "my-stable-persistence-id";
}
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() { @Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder. return ReceiveBuilder.
match(String.class, this::handleEvent).build(); match(String.class, this::handleEvent).build();
@ -374,6 +378,10 @@ public class LambdaPersistenceDocTest {
this.channel = context().actorOf(Channel.props(), "channel"); this.channel = context().actorOf(Channel.props(), "channel");
} }
@Override public String persistenceId() {
return "my-stable-persistence-id";
}
private void handleEvent(String event) { private void handleEvent(String event) {
// update state // update state
// ... // ...
@ -401,6 +409,10 @@ public class LambdaPersistenceDocTest {
//#persist-async //#persist-async
class MyPersistentActor extends AbstractPersistentActor { class MyPersistentActor extends AbstractPersistentActor {
@Override public String persistenceId() {
return "my-stable-persistence-id";
}
private void handleCommand(String c) { private void handleCommand(String c) {
sender().tell(c, self()); sender().tell(c, self());
@ -447,6 +459,10 @@ public class LambdaPersistenceDocTest {
//#defer //#defer
class MyPersistentActor extends AbstractPersistentActor { class MyPersistentActor extends AbstractPersistentActor {
@Override public String persistenceId() {
return "my-stable-persistence-id";
}
private void handleCommand(String c) { private void handleCommand(String c) {
persistAsync(String.format("evt-%s-1", c), e -> { persistAsync(String.format("evt-%s-1", c), e -> {
sender().tell(e, self()); sender().tell(e, self());

View file

@ -74,12 +74,16 @@ class ExampleState implements Serializable {
} }
class ExamplePersistentActor extends AbstractPersistentActor { class ExamplePersistentActor extends AbstractPersistentActor {
private ExampleState state = new ExampleState(); private ExampleState state = new ExampleState();
public int getNumEvents() { public int getNumEvents() {
return state.size(); return state.size();
} }
@Override
public String persistenceId() { return "sample-id-1"; }
@Override @Override
public PartialFunction<Object, BoxedUnit> receiveRecover() { public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder. return ReceiveBuilder.

View file

@ -18,6 +18,9 @@ public class PersistentActorFailureExample {
public static class ExamplePersistentActor extends AbstractPersistentActor { public static class ExamplePersistentActor extends AbstractPersistentActor {
private ArrayList<Object> received = new ArrayList<Object>(); private ArrayList<Object> received = new ArrayList<Object>();
@Override
public String persistenceId() { return "sample-id-2"; }
@Override @Override
public PartialFunction<Object, BoxedUnit> receiveCommand() { public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder. return ReceiveBuilder.

View file

@ -1,58 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class ProcessorChannelExample {
public static class ExamplePersistentActor extends AbstractProcessor {
private ActorRef destination;
private ActorRef channel;
public ExamplePersistentActor(ActorRef destination) {
this.destination = destination;
this.channel = context().actorOf(Channel.props(), "channel");
receive(ReceiveBuilder.
match(Persistent.class, p -> {
System.out.println("processed " + p.payload());
channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self());
}).
match(String.class, s -> System.out.println("reply = " + s)).build()
);
}
}
public static class ExampleDestination extends AbstractActor {
public ExampleDestination() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println("received " + cp.payload());
sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null);
cp.confirm();
}).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class));
final ActorRef processor = system.actorOf(Props.create(ExamplePersistentActor.class, destination), "processor-1");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -62,6 +62,9 @@ public class SnapshotExample {
build(); build();
} }
@Override
public String persistenceId() { return "sample-id-3"; }
@Override @Override
public PartialFunction<Object, BoxedUnit> receiveRecover() { public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder. return ReceiveBuilder.

View file

@ -21,9 +21,7 @@ public class ViewExample {
private int count = 1; private int count = 1;
@Override @Override
public String persistenceId() { public String persistenceId() { return "sample-id-4"; }
return "persistentActor-5";
}
@Override @Override
public PartialFunction<Object, BoxedUnit> receiveCommand() { public PartialFunction<Object, BoxedUnit> receiveCommand() {
@ -49,10 +47,8 @@ public class ViewExample {
private int numReplicated = 0; private int numReplicated = 0;
@Override public String persistenceId() { return "persistentActor-5"; } @Override public String persistenceId() { return "sample-id-4"; }
@Override public String viewId() { @Override public String viewId() { return "sample-view-id-4"; }
return "view-5";
}
public ExampleView() { public ExampleView() {
receive(ReceiveBuilder. receive(ReceiveBuilder.

View file

@ -66,6 +66,9 @@ class ExampleState implements Serializable {
} }
class ExamplePersistentActor extends UntypedPersistentActor { class ExamplePersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "sample-id-1"; }
private ExampleState state = new ExampleState(); private ExampleState state = new ExampleState();
public int getNumEvents() { public int getNumEvents() {

View file

@ -1,12 +1,18 @@
package sample.persistence; package sample.persistence;
import java.util.ArrayList; import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure; import akka.japi.Procedure;
import akka.actor.*; import akka.persistence.UntypedPersistentActor;
import akka.persistence.*;
import java.util.ArrayList;
public class PersistentActorFailureExample { public class PersistentActorFailureExample {
public static class ExamplePersistentActor extends UntypedPersistentActor { public static class ExamplePersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "sample-id-2"; }
private ArrayList<Object> received = new ArrayList<Object>(); private ArrayList<Object> received = new ArrayList<Object>();
@Override @Override
@ -35,7 +41,6 @@ public class PersistentActorFailureExample {
unhandled(message); unhandled(message);
} }
} }
} }
public static void main(String... args) throws Exception { public static void main(String... args) throws Exception {

View file

@ -13,12 +13,10 @@ import java.util.concurrent.TimeUnit;
public class PersistentViewExample { public class PersistentViewExample {
public static class ExamplePersistentActor extends UntypedPersistentActor { public static class ExamplePersistentActor extends UntypedPersistentActor {
private int count = 1;
@Override @Override
public String persistenceId() { public String persistenceId() { return "sample-id-4"; }
return "persistentActor-5";
} private int count = 1;
@Override @Override
public void onReceiveRecover(Object message) { public void onReceiveRecover(Object message) {
@ -49,8 +47,8 @@ public class PersistentViewExample {
private int numReplicated = 0; private int numReplicated = 0;
@Override public String persistenceId() { return "persistentActor-5"; } @Override public String persistenceId() { return "sample-id-4"; }
@Override public String viewId() { return "view-5"; } @Override public String viewId() { return "sample-view-id-4"; }
@Override @Override
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {

View file

@ -1,12 +1,17 @@
package sample.persistence; package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.UntypedPersistentActor;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import akka.actor.*;
import akka.persistence.*;
import akka.japi.Procedure;
public class SnapshotExample { public class SnapshotExample {
public static class ExampleState implements Serializable { public static class ExampleState implements Serializable {
private final ArrayList<String> received; private final ArrayList<String> received;
@ -34,6 +39,9 @@ public class SnapshotExample {
} }
public static class ExamplePersistentActor extends UntypedPersistentActor { public static class ExamplePersistentActor extends UntypedPersistentActor {
@Override
public String persistenceId() { return "sample-id-3"; }
private ExampleState state = new ExampleState(); private ExampleState state = new ExampleState();
@Override @Override
@ -72,6 +80,7 @@ public class SnapshotExample {
unhandled(message); unhandled(message);
} }
} }
} }
public static void main(String... args) throws Exception { public static void main(String... args) throws Exception {

View file

@ -4,20 +4,22 @@ package sample.persistence
import akka.actor._ import akka.actor._
import akka.persistence._ import akka.persistence._
final case class Cmd(data: String) case class Cmd(data: String)
final case class Evt(data: String) case class Evt(data: String)
final case class ExampleState(events: List[String] = Nil) { case class ExampleState(events: List[String] = Nil) {
def update(evt: Evt) = copy(evt.data :: events) def updated(evt: Evt): ExampleState = copy(evt.data :: events)
def size = events.length def size: Int = events.length
override def toString: String = events.reverse.toString override def toString: String = events.reverse.toString
} }
class ExampleProcessor extends PersistentActor { class ExamplePersistentActor extends PersistentActor {
override def persistenceId = "sample-id-1"
var state = ExampleState() var state = ExampleState()
def updateState(event: Evt): Unit = def updateState(event: Evt): Unit =
state = state.update(event) state = state.updated(event)
def numEvents = def numEvents =
state.size state.size
@ -44,14 +46,14 @@ class ExampleProcessor extends PersistentActor {
object PersistentActorExample extends App { object PersistentActorExample extends App {
val system = ActorSystem("example") val system = ActorSystem("example")
val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala") val persistentActor = system.actorOf(Props[ExamplePersistentActor], "persistentActor-4-scala")
processor ! Cmd("foo") persistentActor ! Cmd("foo")
processor ! Cmd("baz") persistentActor ! Cmd("baz")
processor ! Cmd("bar") persistentActor ! Cmd("bar")
processor ! "snap" persistentActor ! "snap"
processor ! Cmd("buzz") persistentActor ! Cmd("buzz")
processor ! "print" persistentActor ! "print"
Thread.sleep(1000) Thread.sleep(1000)
system.shutdown() system.shutdown()

View file

@ -5,6 +5,8 @@ import akka.persistence._
object PersistentActorFailureExample extends App { object PersistentActorFailureExample extends App {
class ExamplePersistentActor extends PersistentActor { class ExamplePersistentActor extends PersistentActor {
override def persistenceId = "sample-id-2"
var received: List[String] = Nil // state var received: List[String] = Nil // state
def receiveCommand: Actor.Receive = { def receiveCommand: Actor.Receive = {

View file

@ -10,6 +10,8 @@ object SnapshotExample extends App {
} }
class ExamplePersistentActor extends PersistentActor { class ExamplePersistentActor extends PersistentActor {
def persistenceId: String = "sample-id-3"
var state = ExampleState() var state = ExampleState()
def receiveCommand: Actor.Receive = { def receiveCommand: Actor.Receive = {
@ -28,6 +30,7 @@ object SnapshotExample extends App {
case evt: String => case evt: String =>
state = state.updated(evt) state = state.updated(evt)
} }
} }
val system = ActorSystem("example") val system = ActorSystem("example")

View file

@ -7,7 +7,7 @@ import akka.persistence._
object ViewExample extends App { object ViewExample extends App {
class ExamplePersistentActor extends PersistentActor { class ExamplePersistentActor extends PersistentActor {
override def persistenceId = "persistentActor-5" override def persistenceId = "sample-id-4"
var count = 1 var count = 1
@ -27,8 +27,8 @@ object ViewExample extends App {
class ExampleView extends PersistentView { class ExampleView extends PersistentView {
private var numReplicated = 0 private var numReplicated = 0
override def persistenceId: String = "persistentActor-5" override def persistenceId: String = "sample-id-4"
override def viewId = "view-5" override def viewId = "sample-view-id-4"
def receive = { def receive = {
case "snap" => case "snap" =>