=per #15429 Rewrite persistence documentation and samples for 2.3.4 changes

(cherry picked from commit 02351e32f110a8c4a249f0f3f84bae5898d1a836)

Conflicts:
	akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html
	akka-samples/akka-sample-persistence-java/tutorial/index.html
	akka-samples/akka-sample-persistence-scala/build.sbt
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ConversationRecoveryExample.scala
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorChannelExample.scala
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala
	akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/StreamExample.scala
	akka-samples/akka-sample-persistence-scala/tutorial/index.html
This commit is contained in:
Patrik Nordwall 2014-06-25 12:51:21 +02:00
parent 062d304b73
commit d6ffdf521c
35 changed files with 1091 additions and 2276 deletions

View file

@ -136,27 +136,31 @@ public class LambdaPersistenceDocTest {
}
//#recovery-completed
class MyProcessor5 extends AbstractProcessor {
public MyProcessor5() {
receive(ReceiveBuilder.
match(RecoveryCompleted.class, r -> {
recoveryCompleted();
getContext().become(active);
}).
build()
);
class MyPersistentActor5 extends AbstractPersistentActor {
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, this::handleEvent).build();
}
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(RecoveryCompleted.class, r -> {
recoveryCompleted();
}).
match(String.class, s -> s.equals("cmd"),
s -> persist("evt", this::handleEvent)).build();
}
private void recoveryCompleted() {
// perform init after recovery, before any other messages
// ...
}
PartialFunction<Object, BoxedUnit> active =
ReceiveBuilder.
match(Persistent.class, message -> {/* ... */}).
build();
private void handleEvent(String event) {
// update state
// ...
}
}
//#recovery-completed

View file

@ -1,76 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class ConversationRecoveryExample {
public static String PING = "PING";
public static String PONG = "PONG";
public static class Ping extends AbstractProcessor {
final ActorRef pongChannel = context().actorOf(Channel.props(), "pongChannel");
int counter = 0;
public Ping() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PING), cp -> {
counter += 1;
System.out.println(String.format("received ping %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pongChannel.tell(Deliver.create(cp.withPayload(PONG), sender().path()), self());
}).
match(String.class,
s -> s.equals("init"),
s -> pongChannel.tell(Deliver.create(Persistent.create(PONG), sender().path()), self())).build()
);
}
}
public static class Pong extends AbstractProcessor {
private final ActorRef pingChannel = context().actorOf(Channel.props(), "pingChannel");
private int counter = 0;
public Pong() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PONG), cp -> {
counter += 1;
System.out.println(String.format("received pong %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pingChannel.tell(Deliver.create(cp.withPayload(PING), sender().path()), self());
}).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping");
final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong");
ping.tell("init", pong);
}
}

View file

@ -73,43 +73,46 @@ class ExampleState implements Serializable {
}
}
class ExampleProcessor extends AbstractPersistentActor {
class ExamplePersistentActor extends AbstractPersistentActor {
private ExampleState state = new ExampleState();
public int getNumEvents() {
return state.size();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(Evt.class, state::update).
match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()).build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.match(Cmd.class, c -> {
final String data = c.getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
persist(asList(evt1, evt2), (Evt evt) -> {
state.update(evt);
if (evt.equals(evt2)) {
context().system().eventStream().publish(evt);
}
});
}).
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(Cmd.class, c -> {
final String data = c.getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
persist(asList(evt1, evt2), (Evt evt) -> {
state.update(evt);
if (evt.equals(evt2)) {
context().system().eventStream().publish(evt);
}
});
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(state.copy())).
match(String.class, s -> s.equals("print"), s -> System.out.println(state)).build();
match(String.class, s -> s.equals("print"), s -> System.out.println(state)).
build();
}
}
//#persistent-actor-example
public class PersistentActorExample {
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-4-java8");
final ActorRef processor = system.actorOf(Props.create(ExamplePersistentActor.class), "processor-4-java8");
processor.tell(new Cmd("foo"), null);
processor.tell(new Cmd("baz"), null);
processor.tell(new Cmd("bar"), null);

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.util.ArrayList;
public class PersistentActorFailureExample {
public static class ExamplePersistentActor extends AbstractPersistentActor {
private ArrayList<Object> received = new ArrayList<Object>();
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}).
match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).
match(String.class, s -> {
persist(s, evt -> {
received.add(evt);
});
}).
build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, s -> received.add(s)).
build();
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef persistentActor = system.actorOf(Props.create(ExamplePersistentActor.class), "persistentActor-2");
persistentActor.tell("a", null);
persistentActor.tell("print", null);
persistentActor.tell("boom", null);
persistentActor.tell("print", null);
persistentActor.tell("b", null);
persistentActor.tell("print", null);
persistentActor.tell("c", null);
persistentActor.tell("print", null);
// Will print in a first run (i.e. with empty journal):
// received [a]
// received [a, b]
// received [a, b, c]
// Will print in a second run:
// received [a, b, c, a]
// received [a, b, c, a, b]
// received [a, b, c, a, b, c]
// etc ...
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -14,11 +14,11 @@ import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class ProcessorChannelExample {
public static class ExampleProcessor extends AbstractProcessor {
public static class ExamplePersistentActor extends AbstractProcessor {
private ActorRef destination;
private ActorRef channel;
public ExampleProcessor(ActorRef destination) {
public ExamplePersistentActor(ActorRef destination) {
this.destination = destination;
this.channel = context().actorOf(Channel.props(), "channel");
@ -47,7 +47,7 @@ public class ProcessorChannelExample {
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class));
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1");
final ActorRef processor = system.actorOf(Props.create(ExamplePersistentActor.class, destination), "processor-1");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);

View file

@ -1,71 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractProcessor;
import akka.persistence.Persistent;
import scala.Option;
import java.util.ArrayList;
public class ProcessorFailureExample {
public static class ExampleProcessor extends AbstractProcessor {
private ArrayList<Object> received = new ArrayList<Object>();
public ExampleProcessor() {
receive(ReceiveBuilder.
match(Persistent.class, p -> p.payload().equals("boom"), p -> {throw new RuntimeException("boom");}).
match(Persistent.class, p -> !p.payload().equals("boom"), p -> received.add(p.payload())).
match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}).
match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).build()
);
}
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage(((Persistent) message.get()).sequenceNr(), false);
}
super.preRestart(reason, message);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2");
processor.tell(Persistent.create("a"), null);
processor.tell("print", null);
processor.tell("boom", null);
processor.tell("print", null);
processor.tell(Persistent.create("b"), null);
processor.tell("print", null);
processor.tell(Persistent.create("boom"), null);
processor.tell("print", null);
processor.tell(Persistent.create("c"), null);
processor.tell("print", null);
// Will print in a first run (i.e. with empty journal):
// received [a]
// received [a, b]
// received [a, b, c]
// Will print in a second run:
// received [a, b, c, a]
// received [a, b, c, a, b]
// received [a, b, c, a, b, c]
// etc ...
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -8,7 +8,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractProcessor;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.Persistent;
import akka.persistence.SnapshotOffer;
import scala.PartialFunction;
@ -43,36 +43,47 @@ public class SnapshotExample {
}
}
public static class ExampleProcessor extends AbstractProcessor {
public static class ExamplePersistentActor extends AbstractPersistentActor {
private ExampleState state = new ExampleState();
public ExampleProcessor() {
receive(ReceiveBuilder.
match(Persistent.class, p -> state.update(String.format("%s-%d", p.payload(), p.sequenceNr()))).
match(SnapshotOffer.class, s -> {
ExampleState exState = (ExampleState) s.snapshot();
System.out.println("offered state = " + exState);
state = exState;
}).
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, s -> s.equals("print"), s -> System.out.println("current state = " + state)).
match(String.class, s -> s.equals("snap"), s ->
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy())).build()
);
saveSnapshot(state.copy())).
match(String.class, s -> {
persist(s, evt -> {
state.update(evt);
});
}).
build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, evt -> state.update(evt)).
match(SnapshotOffer.class, ss -> {
System.out.println("offered state = " + ss);
state = (ExampleState) ss.snapshot();
}).
build();
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java");
final ActorRef persistentActor = system.actorOf(Props.create(ExamplePersistentActor.class), "persistentActor-3-java");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
processor.tell("snap", null);
processor.tell(Persistent.create("c"), null);
processor.tell(Persistent.create("d"), null);
processor.tell("print", null);
persistentActor.tell("a", null);
persistentActor.tell("b", null);
persistentActor.tell("snap", null);
persistentActor.tell("c", null);
persistentActor.tell("d", null);
persistentActor.tell("print", null);
Thread.sleep(1000);
system.shutdown();

View file

@ -17,25 +17,35 @@ import scala.runtime.BoxedUnit;
import java.util.concurrent.TimeUnit;
public class ViewExample {
public static class ExampleProcessor extends AbstractProcessor {
public static class ExamplePersistentActor extends AbstractPersistentActor {
private int count = 1;
@Override
public String processorId() {
return "processor-5";
public String persistenceId() {
return "persistentActor-5";
}
public ExampleProcessor() {
receive(ReceiveBuilder.
match(Persistent.class,
p -> System.out.println(String.format("processor received %s (sequence nr = %d)",
p.payload(),
p.sequenceNr()))).build()
);
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, s -> {
System.out.println(String.format("persistentActor received %s (nr = %d)", s, count));
persist(s + count, evt -> {
count += 1;
});
}).
build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, s -> count += 1).
build();
}
}
public static class ExampleView extends AbstractView {
private final ActorRef destination = context().actorOf(Props.create(ExampleDestination.class));
private final ActorRef channel = context().actorOf(Channel.props("channel"));
private int numReplicated = 0;
@ -46,19 +56,16 @@ public class ViewExample {
@Override
public String persistenceId() {
return "processor-5";
return "persistentActor-5";
}
public ExampleView() {
receive(ReceiveBuilder.
match(Persistent.class, p -> {
numReplicated += 1;
System.out.println(String.format("view received %s (sequence nr = %d, num replicated = %d)",
System.out.println(String.format("view received %s (num replicated = %d)",
p.payload(),
p.sequenceNr(),
numReplicated));
channel.tell(Deliver.create(p.withPayload("replicated-" + p.payload()), destination.path()),
self());
}).
match(SnapshotOffer.class, so -> {
numReplicated = (Integer) so.snapshot();
@ -71,30 +78,16 @@ public class ViewExample {
}
}
public static class ExampleDestination extends AbstractActor {
public ExampleDestination() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println(String.format("destination received %s (sequence nr = %s)",
cp.payload(),
cp.sequenceNr()));
cp.confirm();
}).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class));
final ActorRef persistentActor = system.actorOf(Props.create(ExamplePersistentActor.class));
final ActorRef view = system.actorOf(Props.create(ExampleView.class));
system.scheduler()
.schedule(Duration.Zero(),
Duration.create(2, TimeUnit.SECONDS),
processor,
Persistent.create("scheduled"),
persistentActor,
"scheduled",
system.dispatcher(),
null);
system.scheduler()

View file

@ -12,12 +12,10 @@ This tutorial contains examples that illustrate a subset of
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/lambda-persistence.html" target="_blank">Akka Persistence</a> features.
</p>
<ul>
<li>Processors and channels</li>
<li>Processsor snapshots</li>
<li>Eventsourced processors</li>
<li>Processor failure handling</li>
<li>Processor views</li>
<li>Processor conversation recovery</li>
<li>persistent actor</li>
<li>persistent actor snapshots</li>
<li>persistent actor recovery</li>
<li>persistent actor views</li>
</ul>
<p>
@ -27,47 +25,13 @@ Custom storage locations for the journal and snapshots can be defined in
</div>
<div>
<h2>Processors and channels</h2>
<h2>Persistent actor</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ProcessorChannelExample.java" class="shortcut">ProcessorChannelExample.java</a>
defines an <code>ExampleProcessor</code> and an <code>ExampleDestination</code>. The processor sends messages to a
destination via a channel. The destination confirms the delivery of these messages so that replayed messages aren't
redundantly delivered to the destination. Repeated runs of the application demonstrates that the processor receives
both replayed and new messages whereas the channel only receives new messages, sent by the application. The processor
also receives replies from the destination, demonstrating that a channel preserves sender references.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ProcessorChannelExample</code></b> several times.
</p>
</div>
<div>
<h2>Processor snapshots</h2>
<p>
<a href="#code/src/main/java/sample/persistence/SnapshotExample.java" class="shortcut">SnapshotExample.java</a>
demonstrates how processors can take snapshots of application state and recover from previously stored snapshots.
Snapshots are offered to processors at the beginning of recovery, before any messages (younger than the snapshot)
are replayed.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.SnapshotExample</code></b> several times. With every run, the state offered by the
most recent snapshot is printed to <code>stdout</code>, followed by the updated state after sending new persistent
messages to the processor.
</p>
</div>
<div>
<h2>Eventsourced processors</h2>
<p>
<a href="#code/src/main/java/sample/persistence/EventsourcedExample.java" class="shortcut">EventsourcedExample.java</a>
is described in detail in the <a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/lambda-persistence.html#event-sourcing" target="_blank">Event sourcing</a>
section of the user documentation. With every application run, the <code>ExampleProcessor</code> is recovered from
<a href="#code/src/main/java/sample/persistence/PersistentActorExample.java" class="shortcut">PersistentActorExample.java</a>
is described in detail in the <a href="http://doc.akka.io/docs/akka/2.3-SNAPSHOT/java/lambda-persistence.html#event-sourcing-java-lambda" target="_blank">Event sourcing</a>
section of the user documentation. With every application run, the <code>ExamplePersistentActor</code> is recovered from
events stored in previous application runs, processes new commands, stores new events and snapshots and prints the
current processor state to <code>stdout</code>.
current persistent actor state to <code>stdout</code>.
</p>
<p>
@ -77,60 +41,48 @@ To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and
</div>
<div>
<h2>Processor failure handling</h2>
<h2>Persistent actor snapshots</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ProcessorFailureExample.java" class="shortcut">ProcessorFailureExample.java</a>
shows how a processor can delete persistent messages from the journal if they threw an exception. Throwing an exception
restarts the processor and replays messages. In order to prevent that the message that caused the exception is replayed,
it is marked as deleted in the journal (during invocation of <code>preRestart</code>). This is a common pattern in
command-sourcing to compensate write-ahead logging of messages.
<a href="#code/src/main/java/sample/persistence/SnapshotExample.java" class="shortcut">SnapshotExample.java</a>
demonstrates how persistent actors can take snapshots of application state and recover from previously stored snapshots.
Snapshots are offered to persistent actors at the beginning of recovery, before any messages (younger than the snapshot)
are replayed.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ProcessorFailureExample</code></b> several times.
</p>
<p>
<a href="http://doc.akka.io/docs/akka/2.4-SNAPSHOT/java/lambda-persistence.html#event-sourcing" target="_blank">Event sourcing</a>
on the other hand, does not persist commands directly but rather events that have been derived from received commands
(not shown here). These events are known to be successfully applicable to current processor state i.e. there's
no need for deleting them from the journal. Event sourced processors usually have a lower throughput than command
sourced processors, as the maximum size of a write batch is limited by the number of persisted events per received
command.
<b><code>sample.persistence.SnapshotExample</code></b> several times. With every run, the state offered by the
most recent snapshot is printed to <code>stdout</code>, followed by the updated state after sending new persistent
messages to the persistent actor.
</p>
</div>
<div>
<h2>Processor views</h2>
<h2>Persistent actor recovery</h2>
<p>
<a href="#code/src/main/java/sample/persistence/PersistentActorFailureExample.java" class="shortcut">PersistentActorFailureExample.java</a>
shows how a persistent actor can throw an exception, restart and restore the state by replaying the events.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.PersistentActorFailureExample</code></b> several times.
</p>
</div>
<div>
<h2>Persistent actor views</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ViewExample.java" class="shortcut">ViewExample.java</a> demonstrates
how a view (<code>ExampleView</code>) is updated with the persistent message stream of a processor
(<code>ExampleProcessor</code>). Messages sent to the processor are read from <code>stdin</code>. Views also support
snapshotting and can be used in combination with channels in the same way as processors.
how a view (<code>ExampleView</code>) is updated with the persistent message stream of a persistent actor
(<code>ExamplePersistentActor</code>). Messages sent to the persistent actor are scheduled periodically. Views also support
snapshotting to reduce recovery time.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ViewExample</code></b>.
</p>
<p>
Views can also receive events that have been persisted by event sourced processors (not shown).
</p>
</div>
<div>
<h2>Processor conversation recovery</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ConversationRecoveryExample.java" class="shortcut">ConversationRecoveryExample.java</a>
defines two processors that send messages to each other via channels. The reliable delivery properties of channels,
in combination with processors, allow these processors to automatically resume their conversation after a JVM crash.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ConversationRecoveryExample</code></b> several times.
</p>
</div>
</body>