From c1f7d7fa21a8bf647350d3399fb855665c452118 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 24 Mar 2014 15:35:54 +0100 Subject: [PATCH] =doc #3924 Doc how to be notified when recovery completed --- .../extension/SettingsExtensionDocTest.java | 4 +- .../docs/persistence/PersistenceDocTest.java | 38 +++++++++++++++++- akka-docs/rst/java/lambda-persistence.rst | 8 ++++ akka-docs/rst/java/persistence.rst | 8 ++++ .../docs/persistence/PersistenceDocSpec.scala | 29 ++++++++++++++ akka-docs/rst/scala/persistence.rst | 9 +++++ .../java/doc/LambdaPersistenceDocTest.java | 40 +++++++++++++++++++ 7 files changed, 132 insertions(+), 4 deletions(-) diff --git a/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTest.java b/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTest.java index b3bae19c7d..46e32e1d96 100644 --- a/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTest.java +++ b/akka-docs/rst/java/code/docs/extension/SettingsExtensionDocTest.java @@ -30,8 +30,8 @@ public class SettingsExtensionDocTest { public SettingsImpl(Config config) { DB_URI = config.getString("myapp.db.uri"); CIRCUIT_BREAKER_TIMEOUT = - Duration.create(config.getMilliseconds("myapp.circuit-breaker.timeout"), - TimeUnit.MILLISECONDS); + Duration.create(config.getDuration("myapp.circuit-breaker.timeout", + TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } } diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 2b36701bc3..2e296ce950 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -8,11 +8,9 @@ import java.util.concurrent.TimeUnit; import scala.Option; import scala.concurrent.duration.Duration; - import akka.actor.*; import akka.japi.Procedure; import akka.persistence.*; - import static java.util.Arrays.asList; public class PersistenceDocTest { @@ -122,6 +120,42 @@ public class PersistenceDocTest { @Override public void onReceive(Object message) throws Exception {} } + + class MyProcessor5 extends UntypedProcessor { + //#recovery-completed + @Override + public void preStart() throws Exception { + super.preStart(); + self().tell("FIRST", self()); + } + + public void onReceive(Object message) throws Exception { + if (message.equals("FIRST")) { + recoveryCompleted(); + getContext().become(active); + unstashAll(); + } else if (recoveryFinished()) { + stash(); + } else { + active.apply(message); + } + } + + private void recoveryCompleted() { + // perform init after recovery, before any other messages + // ... + } + + Procedure active = new Procedure() { + @Override + public void apply(Object message) { + if (message instanceof Persistent) { + // ... + } + } + }; + //#recovery-completed + } }; static Object o3 = new Object() { diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 3ef6391859..955e7d114d 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -122,6 +122,14 @@ A processor can query its own recovery status via the methods .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status +Sometimes there is a need for performing additional initialization when the +recovery has completed, before processing any other message sent to the processor. +The processor can send itself a message from ``preStart``. It will be stashed and received +after recovery. The mailbox may contain other messages that are queued in front of +that message and therefore you need to stash until you receive that message. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed + .. _failure-handling-java-lambda: Failure handling diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 7b1aa607f7..fe256b071b 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -130,6 +130,14 @@ A processor can query its own recovery status via the methods .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-status +Sometimes there is a need for performing additional initialization when the +recovery has completed, before processing any other message sent to the processor. +The processor can send itself a message from ``preStart``. It will be stashed and received +after recovery. The mailbox may contain other messages that are queued in front of +that message and therefore you need to stash until you receive that message. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed + .. _failure-handling-java: Failure handling diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index a735acd9bc..6253446802 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -84,6 +84,35 @@ trait PersistenceDocSpec { } //#deletion } + + class MyProcessor4 extends Processor { + //#recovery-completed + override def preStart(): Unit = { + super.preStart() + self ! "FIRST" + } + + def receive = initializing.orElse(active) + + def initializing: Receive = { + case "FIRST" => + recoveryCompleted() + context.become(active) + unstashAll() + case other if recoveryFinished => + stash() + } + + def recoveryCompleted(): Unit = { + // perform init after recovery, before any other messages + // ... + } + + def active: Receive = { + case Persistent(msg, _) => //... + } + //#recovery-completed + } } new AnyRef { diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 61c50689c5..9b4d1c35ce 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -119,6 +119,15 @@ A processor can query its own recovery status via the methods .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-status +Sometimes there is a need for performing additional initialization when the +recovery has completed, before processing any other message sent to the processor. +The processor can send itself a message from ``preStart``. It will be stashed and received +after recovery. The mailbox may contain other messages that are queued in front of +that message and therefore you need to stash until you receive that message. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed + + .. _failure-handling: Failure handling diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index a4fff7f432..4e2f985c2d 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -130,6 +130,46 @@ public class LambdaPersistenceDocTest { ); } } + + //#recovery-completed + class MyProcessor5 extends AbstractProcessor { + + public MyProcessor5() { + receive(ReceiveBuilder. + matchEquals("FIRST", s -> { + recoveryCompleted(); + getContext().become(active); + unstashAll(); + }). + matchAny(message -> { + if (recoveryFinished()) { + stash(); + } else { + active.apply(message); + } + }). + build() + ); + } + + @Override + public void preStart() throws Exception { + super.preStart(); + self().tell("FIRST", self()); + } + + private void recoveryCompleted() { + // perform init after recovery, before any other messages + // ... + } + + PartialFunction active = + ReceiveBuilder. + match(Persistent.class, message -> {/* ... */}). + build(); + + } + //#recovery-completed }; static Object o3 = new Object() {