=doc #3924 Doc how to be notified when recovery completed

This commit is contained in:
Patrik Nordwall 2014-03-24 15:35:54 +01:00
parent 4ba1bc46fb
commit c1f7d7fa21
7 changed files with 132 additions and 4 deletions

View file

@ -30,8 +30,8 @@ public class SettingsExtensionDocTest {
public SettingsImpl(Config config) { public SettingsImpl(Config config) {
DB_URI = config.getString("myapp.db.uri"); DB_URI = config.getString("myapp.db.uri");
CIRCUIT_BREAKER_TIMEOUT = CIRCUIT_BREAKER_TIMEOUT =
Duration.create(config.getMilliseconds("myapp.circuit-breaker.timeout"), Duration.create(config.getDuration("myapp.circuit-breaker.timeout",
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
} }
} }

View file

@ -8,11 +8,9 @@ import java.util.concurrent.TimeUnit;
import scala.Option; import scala.Option;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import akka.actor.*; import akka.actor.*;
import akka.japi.Procedure; import akka.japi.Procedure;
import akka.persistence.*; import akka.persistence.*;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
public class PersistenceDocTest { public class PersistenceDocTest {
@ -122,6 +120,42 @@ public class PersistenceDocTest {
@Override @Override
public void onReceive(Object message) throws Exception {} public void onReceive(Object message) throws Exception {}
} }
class MyProcessor5 extends UntypedProcessor {
//#recovery-completed
@Override
public void preStart() throws Exception {
super.preStart();
self().tell("FIRST", self());
}
public void onReceive(Object message) throws Exception {
if (message.equals("FIRST")) {
recoveryCompleted();
getContext().become(active);
unstashAll();
} else if (recoveryFinished()) {
stash();
} else {
active.apply(message);
}
}
private void recoveryCompleted() {
// perform init after recovery, before any other messages
// ...
}
Procedure<Object> active = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message instanceof Persistent) {
// ...
}
}
};
//#recovery-completed
}
}; };
static Object o3 = new Object() { static Object o3 = new Object() {

View file

@ -122,6 +122,14 @@ A processor can query its own recovery status via the methods
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the processor.
The processor can send itself a message from ``preStart``. It will be stashed and received
after recovery. The mailbox may contain other messages that are queued in front of
that message and therefore you need to stash until you receive that message.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed
.. _failure-handling-java-lambda: .. _failure-handling-java-lambda:
Failure handling Failure handling

View file

@ -130,6 +130,14 @@ A processor can query its own recovery status via the methods
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-status .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-status
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the processor.
The processor can send itself a message from ``preStart``. It will be stashed and received
after recovery. The mailbox may contain other messages that are queued in front of
that message and therefore you need to stash until you receive that message.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed
.. _failure-handling-java: .. _failure-handling-java:
Failure handling Failure handling

View file

@ -84,6 +84,35 @@ trait PersistenceDocSpec {
} }
//#deletion //#deletion
} }
class MyProcessor4 extends Processor {
//#recovery-completed
override def preStart(): Unit = {
super.preStart()
self ! "FIRST"
}
def receive = initializing.orElse(active)
def initializing: Receive = {
case "FIRST" =>
recoveryCompleted()
context.become(active)
unstashAll()
case other if recoveryFinished =>
stash()
}
def recoveryCompleted(): Unit = {
// perform init after recovery, before any other messages
// ...
}
def active: Receive = {
case Persistent(msg, _) => //...
}
//#recovery-completed
}
} }
new AnyRef { new AnyRef {

View file

@ -119,6 +119,15 @@ A processor can query its own recovery status via the methods
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-status .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-status
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the processor.
The processor can send itself a message from ``preStart``. It will be stashed and received
after recovery. The mailbox may contain other messages that are queued in front of
that message and therefore you need to stash until you receive that message.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed
.. _failure-handling: .. _failure-handling:
Failure handling Failure handling

View file

@ -130,6 +130,46 @@ public class LambdaPersistenceDocTest {
); );
} }
} }
//#recovery-completed
class MyProcessor5 extends AbstractProcessor {
public MyProcessor5() {
receive(ReceiveBuilder.
matchEquals("FIRST", s -> {
recoveryCompleted();
getContext().become(active);
unstashAll();
}).
matchAny(message -> {
if (recoveryFinished()) {
stash();
} else {
active.apply(message);
}
}).
build()
);
}
@Override
public void preStart() throws Exception {
super.preStart();
self().tell("FIRST", self());
}
private void recoveryCompleted() {
// perform init after recovery, before any other messages
// ...
}
PartialFunction<Object, BoxedUnit> active =
ReceiveBuilder.
match(Persistent.class, message -> {/* ... */}).
build();
}
//#recovery-completed
}; };
static Object o3 = new Object() { static Object o3 = new Object() {