diff --git a/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java b/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java index bd47648fe7..cd03439b2e 100644 --- a/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java +++ b/akka-actor/src/main/java/akka/japi/pf/ReceiveBuilder.java @@ -14,8 +14,8 @@ package akka.japi.pf; * Example: *
  * @Override
- * public PartialFunction receive() {
- *   return ReceiveBuilder.
+ * public Actor() {
+ *   receive(ReceiveBuilder.
  *     match(Double.class, d -> {
  *       sender().tell(d.isNaN() ? 0 : d, self());
  *     }).
@@ -24,7 +24,8 @@ package akka.japi.pf;
  *     }).
  *     match(String.class, s -> s.startsWith("foo"), s -> {
  *       sender().tell(s.toUpperCase(), self());
- *     }).build();
+ *     }).build()
+ *   );
  * }
  * 
* diff --git a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala index 98d36ae7cf..699b918d8e 100644 --- a/akka-actor/src/main/scala/akka/actor/AbstractActor.scala +++ b/akka-actor/src/main/scala/akka/actor/AbstractActor.scala @@ -25,9 +25,9 @@ object AbstractActor { *
  * public class MyActor extends AbstractActor {
  *   int count = 0;
- *   @Override
- *   public PartialFunction receive() {
- *     return ReceiveBuilder.
+ *
+ *   public MyActor() {
+ *     receive(ReceiveBuilder.
  *       match(Double.class, d -> {
  *         sender().tell(d.isNaN() ? 0 : d, self());
  *       }).
@@ -36,7 +36,8 @@ object AbstractActor {
  *       }).
  *       match(String.class, s -> s.startsWith("foo"), s -> {
  *         sender().tell(s.toUpperCase(), self());
- *       }).build();
+ *       }).build()
+ *     );
  *   }
  * }
  * 
@@ -44,12 +45,30 @@ object AbstractActor { * This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing. */ abstract class AbstractActor extends Actor { + + private var _receive: Receive = null + + /** + * Set up the initial receive behavior of the Actor. + * + * @param receive The receive behavior. + */ + @throws(classOf[IllegalActorStateException]) + protected def receive(receive: Receive): Unit = + if (_receive == null) _receive = receive + else throw IllegalActorStateException("Actor behavior has already been set with receive(...), " + + "use context().become(...) to change it later") + /** * Returns this AbstractActor's AbstractActorContext * The AbstractActorContext is not thread safe so do not expose it outside of the * AbstractActor. */ def getContext(): AbstractActorContext = context.asInstanceOf[AbstractActorContext] + + override def receive = + if (_receive != null) _receive + else throw IllegalActorStateException("Actor behavior has not been set with receive(...)") } /** @@ -73,9 +92,9 @@ abstract class AbstractLoggingActor extends AbstractActor with ActorLogging *
  * public class MyActorWithStash extends AbstractActorWithStash {
  *   int count = 0;
- *   @Override
- *   public PartialFunction receive() {
- *     return ReceiveBuilder.match(String.class, s -> {
+ *
+ *   public MyActorWithStash() {
+ *     receive(ReceiveBuilder. match(String.class, s -> {
  *       if (count < 0) {
  *         sender().tell(new Integer(s.length()), self());
  *       } else if (count == 2) {
@@ -84,8 +103,8 @@ abstract class AbstractLoggingActor extends AbstractActor with ActorLogging
  *       } else {
  *         count += 1;
  *         stash();
- *       }
- *     }).build();
+ *       }}).build()
+ *     );
  *   }
  * }
  * 
diff --git a/akka-docs/rst/java/lambda-actors.rst b/akka-docs/rst/java/lambda-actors.rst index 59b488ba2b..8c3202a9ea 100644 --- a/akka-docs/rst/java/lambda-actors.rst +++ b/akka-docs/rst/java/lambda-actors.rst @@ -38,11 +38,13 @@ Creating Actors Defining an Actor class ----------------------- -Actor classes are implemented by extending the :class:`AbstractActor` class and implementing -the :meth:`receive` method. The :meth:`receive` method should define a series of match -statements (which has the type ``PartialFunction``) that defines -which messages your Actor can handle, along with the implementation of how the -messages should be processed. +Actor classes are implemented by extending the :class:`AbstractActor` class and setting +the “initial behavior” in the constructor by calling the :meth:`receive` method in +the :class:`AbstractActor`. + +The argument to the :meth:`receive` method is a ``PartialFunction`` +that defines which messages your Actor can handle, along with the implementation of +how the messages should be processed. Don't let the type signature scare you. To allow you to easily build up a partial function there is a builder named ``ReceiveBuilder`` that you can use. @@ -64,7 +66,7 @@ Note further that the return type of the behavior defined above is ``Unit``; if the actor shall reply to the received message then this must be done explicitly as explained below. -The result of the :meth:`receive` method is a partial function object, which is +The argument to the :meth:`receive` method is a partial function object, which is stored within the actor as its “initial behavior”, see `Become/Unbecome`_ for further information on changing the behavior of an actor after its construction. @@ -218,8 +220,8 @@ last line. Watching an actor is quite simple as well: Actor API ========= -The :class:`AbstractActor` class defines only one abstract method, the above mentioned -:meth:`receive`, which implements the behavior of the actor. +The :class:`AbstractActor` class defines a method called :meth:`receive`, +that is used to set the “initial behavior” of the actor. If the current actor behavior does not match a received message, :meth:`unhandled` is called, which by default publishes an @@ -592,13 +594,21 @@ routers, load-balancers, replicators etc. Receive messages ================ -An Actor has to implement the ``receive`` method to receive messages: +An Actor either has to set its initial receive behavior in the constructor by +calling the :meth:`receive` method in the :class:`AbstractActor`: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java + :include: receive-constructor + :exclude: and-some-behavior + +or by implementing the :meth:`receive` method in the :class:`Actor` interface: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#receive -The :meth:`receive` method should define a series of match statements (which has the type -``PartialFunction``) that defines which messages your Actor can handle, -along with the implementation of how the messages should be processed. +Both the argument to the :class:`AbstractActor` :meth:`receive` method and the return +type of the :class:`Actor` :meth:`receive` method is a ``PartialFunction`` +that defines which messages your Actor can handle, along with the implementation of how the messages +should be processed. Don't let the type signature scare you. To allow you to easily build up a partial function there is a builder named ``ReceiveBuilder`` that you can use. @@ -765,9 +775,6 @@ behavior is not the default). .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#swapper -Stash -===== - Stash ===== diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index aaab826af4..3ef6391859 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -41,7 +41,7 @@ Architecture ============ * *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal - before its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed + before its behavior is called. When a processor is started or restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages. * *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another @@ -71,13 +71,13 @@ Architecture Processors ========== -A processor can be implemented by extending ``AbstractProcessor`` class and implementing the -``receive`` method. +A processor can be implemented by extending ``AbstractProcessor`` class and setting +the “initial behavior” in the constructor by calling the :meth:`receive` method .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#definition Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted. -When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message +When a processor's behavior is called with a ``Persistent`` message it can safely assume that this message has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor is stopped, by default. If a processor should continue running on persistence failures it must handle ``PersistenceFailure`` messages. In this case, a processor may want to inform the sender about the failure, @@ -164,9 +164,8 @@ Overriding ``processorId`` is the recommended way to generate stable identifiers Views ===== -Views can be implemented by extending the ``AbstractView`` abstract class and implementing the ``receive`` and the -``processorId`` -methods. +Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``processorId`` method +and setting the “initial behavior” in the constructor by calling the :meth:`receive` method. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 3b00ae4732..50f4467535 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -222,7 +222,7 @@ private[persistence] trait Eventsourced extends Processor { * An event sourced processor. */ trait EventsourcedProcessor extends Processor with Eventsourced { - final def receive = receiveCommand + def receive = receiveCommand } /** @@ -344,4 +344,10 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou */ final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit = persist(Util.immutableSeq(events))(event ⇒ handler(event)) + + override def receive = super[EventsourcedProcessor].receive + + override def receive(receive: Receive): Unit = { + throw new IllegalArgumentException("Define the behavior by overriding receiveRecover and receiveCommand") + } } \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 6106f0d2f3..5c1e9309c5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -388,28 +388,23 @@ final case class RecoveryException(message: String, cause: Throwable) extends Ak abstract class UntypedProcessor extends UntypedActor with Processor /** - * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]). + * Java API: compatible with lambda expressions + * * An actor that persists (journals) messages of type [[Persistent]]. Messages of other types * are not persisted. - * - * {{{ - * import akka.persistence.AbstractProcessor; - * import akka.persistence.Persistent; - * import akka.actor.ActorRef; - * import akka.actor.Props; - * import akka.japi.pf.ReceiveBuilder; - * import scala.PartialFunction; - * import scala.runtime.BoxedUnit; - * + *

+ * Example: + *

  * class MyProcessor extends AbstractProcessor {
- *     public PartialFunction receive() {
- *         return ReceiveBuilder.
- *             match(Persistent.class, p -> {
- *                 Object payload = p.payload();
- *                 Long sequenceNr = p.sequenceNr();
+ *   public MyProcessor() {
+ *     receive(ReceiveBuilder.
+ *       match(Persistent.class, p -> {
+ *         Object payload = p.payload();
+ *         Long sequenceNr = p.sequenceNr();
  *                 // ...
- *             }).build();
- *     }
+ *       }).build()
+ *     );
+ *   }
  * }
  *
  * // ...
@@ -418,7 +413,7 @@ abstract class UntypedProcessor extends UntypedActor with Processor
  *
  * processor.tell(Persistent.create("foo"), null);
  * processor.tell("bar", null);
- * }}}
+ * 
* * During start and restart, persistent messages are replayed to a processor so that it can recover internal * state from these messages. New messages sent to a processor during recovery do not interfere with replayed diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java index 9579f14b72..4caa42f60d 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java @@ -8,10 +8,16 @@ import akka.actor.*; import akka.event.LoggingAdapter; import akka.event.Logging; import akka.japi.pf.ReceiveBuilder; +import akka.testkit.ErrorFilter; +import akka.testkit.EventFilter; +import akka.testkit.TestEvent; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import scala.PartialFunction; import scala.runtime.BoxedUnit; import static docs.actor.Messages.Swap.Swap; import static docs.actor.Messages.*; +import static akka.japi.Util.immutableSeq; import java.util.concurrent.TimeUnit; @@ -43,11 +49,19 @@ import static akka.pattern.Patterns.gracefulStop; public class ActorDocTest { + public static Config config = ConfigFactory.parseString( + "akka {\n" + + " loggers = [\"akka.testkit.TestEventListener\"]\n" + + " loglevel = \"WARNING\"\n" + + " stdout-loglevel = \"WARNING\"\n" + + "}\n" + ); + static ActorSystem system = null; @BeforeClass public static void beforeClass() { - system = ActorSystem.create("ActorDocTest"); + system = ActorSystem.create("ActorDocTest", config); } @AfterClass @@ -61,18 +75,27 @@ public class ActorDocTest { public class FirstActor extends AbstractActor { final ActorRef child = context().actorOf(Props.create(MyActor.class), "myChild"); //#plus-some-behavior - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public FirstActor() { + receive(ReceiveBuilder. matchAny(x -> { sender().tell(x, self()); - }).build(); + }).build() + ); } //#plus-some-behavior } //#context-actorOf - static public abstract class ReceiveActor extends AbstractActor { + static public abstract class SomeActor extends AbstractActor { + //#receive-constructor + public SomeActor() { + receive(ReceiveBuilder. + //#and-some-behavior + match(String.class, s -> System.out.println(s.toLowerCase())). + //#and-some-behavior + build()); + } + //#receive-constructor @Override //#receive public abstract PartialFunction receive(); @@ -84,12 +107,9 @@ public class ActorDocTest { ActorWithArgs(String args) { this.args = args; - } - - @Override - public PartialFunction receive() { - return ReceiveBuilder. - matchAny(x -> { }).build(); + receive(ReceiveBuilder. + matchAny(x -> { }).build() + ); } } @@ -112,14 +132,11 @@ public class ActorDocTest { DemoActor(Integer magicNumber) { this.magicNumber = magicNumber; - } - - @Override - public PartialFunction receive() { - return ReceiveBuilder. + receive(ReceiveBuilder. match(Integer.class, i -> { sender().tell(i + magicNumber, self()); - }).build(); + }).build() + ); } } @@ -131,9 +148,8 @@ public class ActorDocTest { ActorRef demoActor = context().actorOf(DemoActor.props(42), "demo"); // ... //#props-factory - @Override - public PartialFunction receive() { - return emptyBehavior(); + public SomeOtherActor() { + receive(emptyBehavior()); } //#props-factory } @@ -141,15 +157,15 @@ public class ActorDocTest { public static class Hook extends AbstractActor { ActorRef target = null; + public Hook() { + receive(emptyBehavior()); + } //#preStart @Override public void preStart() { target = context().actorOf(Props.create(MyActor.class, "target")); } //#preStart - public PartialFunction receive() { - return emptyBehavior(); - } //#postStop @Override public void postStop() { @@ -191,9 +207,8 @@ public class ActorDocTest { } public static class ReplyException extends AbstractActor { - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public ReplyException() { + receive(ReceiveBuilder. matchAny(x -> { //#reply-exception try { @@ -204,7 +219,8 @@ public class ActorDocTest { throw e; } //#reply-exception - }).build(); + }).build() + ); } private String operation() { @@ -223,16 +239,16 @@ public class ActorDocTest { private ActorRef worker = context().watch(context().actorOf(Props.create(Cruncher.class), "worker")); - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public Manager() { + receive(ReceiveBuilder. matchEquals("job", s -> { worker.tell("crunch", self()); }). matchEquals(SHUTDOWN, x -> { worker.tell(PoisonPill.getInstance(), self()); context().become(shuttingDown); - }).build(); + }).build() + ); } public PartialFunction shuttingDown = @@ -263,29 +279,27 @@ public class ActorDocTest { public static class Cruncher extends AbstractActor { - @Override - public PartialFunction receive() { - return ReceiveBuilder. - matchEquals("crunch", s -> { }).build(); + public Cruncher() { + receive(ReceiveBuilder. + matchEquals("crunch", s -> { }).build() + ); } } static //#swapper - public class Swapper extends AbstractActor { - final LoggingAdapter log = Logging.getLogger(context().system(), this); - - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public class Swapper extends AbstractLoggingActor { + public Swapper() { + receive(ReceiveBuilder. matchEquals(Swap, s -> { - log.info("Hi"); + log().info("Hi"); context().become(ReceiveBuilder. matchEquals(Swap, x -> { - log.info("Ho"); + log().info("Ho"); context().unbecome(); // resets the latest 'become' (just for fun) }).build(), false); // push on top instead of replace - }).build(); + }).build() + ); } } @@ -312,7 +326,7 @@ public class ActorDocTest { public void creatingActorWithSystemActorOf() { //#system-actorOf // ActorSystem is a heavy object: create only one per application - final ActorSystem system = ActorSystem.create("MySystem"); + final ActorSystem system = ActorSystem.create("MySystem", config); final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor"); //#system-actorOf try { @@ -357,17 +371,14 @@ public class ActorDocTest { static //#receive-timeout public class ReceiveTimeoutActor extends AbstractActor { - public ReceiveTimeoutActor() { - // To set an initial delay - context().setReceiveTimeout(Duration.create("10 seconds")); - } //#receive-timeout ActorRef target = context().system().deadLetters(); //#receive-timeout + public ReceiveTimeoutActor() { + // To set an initial delay + context().setReceiveTimeout(Duration.create("10 seconds")); - @Override - public PartialFunction receive() { - return ReceiveBuilder. + receive(ReceiveBuilder. matchEquals("Hello", s -> { // To set in a response to a message context().setReceiveTimeout(Duration.create("1 second")); @@ -382,7 +393,8 @@ public class ActorDocTest { //#receive-timeout target.tell("timeout", self()); //#receive-timeout - }).build(); + }).build() + ); } } //#receive-timeout @@ -405,7 +417,7 @@ public class ActorDocTest { private PartialFunction angry; private PartialFunction happy; - { + public HotSwapActor() { angry = ReceiveBuilder. matchEquals("foo", s -> { @@ -422,16 +434,15 @@ public class ActorDocTest { matchEquals("foo", s -> { context().become(angry); }).build(); - } - public PartialFunction receive() { - return ReceiveBuilder. + receive(ReceiveBuilder. matchEquals("foo", s -> { context().become(angry); }). matchEquals("bar", s -> { context().become(happy); - }).build(); + }).build() + ); } } //#hot-swap-actor @@ -459,8 +470,8 @@ public class ActorDocTest { static //#stash public class ActorWithProtocol extends AbstractActorWithStash { - public PartialFunction receive() { - return ReceiveBuilder. + public ActorWithProtocol() { + receive(ReceiveBuilder. matchEquals("open", s -> { context().become(ReceiveBuilder. matchEquals("write", ws -> { /* do writing */ }). @@ -470,7 +481,8 @@ public class ActorDocTest { }). matchAny(msg -> stash()).build(), false); }). - matchAny(msg -> stash()).build(); + matchAny(msg -> stash()).build() + ); } } //#stash @@ -484,20 +496,20 @@ public class ActorDocTest { //#watch public class WatchActor extends AbstractActor { private final ActorRef child = context().actorOf(Props.empty(), "target"); - { - context().watch(child); // <-- this is the only call needed for registration - } private ActorRef lastSender = system.deadLetters(); - public PartialFunction receive() { - return ReceiveBuilder. + public WatchActor() { + context().watch(child); // <-- this is the only call needed for registration + + receive(ReceiveBuilder. matchEquals("kill", s -> { context().stop(child); lastSender = sender(); }). match(Terminated.class, t -> t.actor().equals(child), t -> { lastSender.tell("finished", self()); - }).build(); + }).build() + ); } } //#watch @@ -519,13 +531,11 @@ public class ActorDocTest { public class Follower extends AbstractActor { final Integer identifyId = 1; - { + public Follower(){ ActorSelection selection = context().actorSelection("/user/another"); selection.tell(new Identify(identifyId), self()); - } - public PartialFunction receive() { - return ReceiveBuilder. + receive(ReceiveBuilder. match(ActorIdentity.class, id -> id.getRef() != null, id -> { ActorRef ref = id.getRef(); context().watch(ref); @@ -533,7 +543,8 @@ public class ActorDocTest { }). match(ActorIdentity.class, id -> id.getRef() == null, id -> { context().stop(self()); - }).build(); + }).build() + ); } final PartialFunction active(final ActorRef another) { @@ -558,4 +569,73 @@ public class ActorDocTest { } }; } + + public static class NoReceiveActor extends AbstractActor { + } + + @Test + public void noReceiveActor() { + EventFilter ex1 = new ErrorFilter(ActorInitializationException.class); + EventFilter[] ignoreExceptions = { ex1 }; + try { + system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions))); + new JavaTestKit(system) {{ + final ActorRef victim = new EventFilter(ActorInitializationException.class) { + protected ActorRef run() { + return system.actorOf(Props.create(NoReceiveActor.class), "victim"); + } + }.message("Actor behavior has not been set with receive(...)").occurrences(1).exec(); + + assertEquals(true, victim.isTerminated()); + }}; + } finally { + system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions))); + } + } + + public static class MultipleReceiveActor extends AbstractActor { + public MultipleReceiveActor() { + receive(ReceiveBuilder. + match(String.class, s1 -> s1.toLowerCase().equals("become"), s1 -> { + sender().tell(s1.toUpperCase(), self()); + receive(ReceiveBuilder. + match(String.class, s2 -> { + sender().tell(s2.toLowerCase(), self()); + }).build() + ); + }). + match(String.class, s1 -> { + sender().tell(s1.toUpperCase(), self()); + }).build() + ); + } + } + + @Test + public void multipleReceiveActor() { + EventFilter ex1 = new ErrorFilter(IllegalActorStateException.class); + EventFilter[] ignoreExceptions = { ex1 }; + try { + system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions))); + new JavaTestKit(system) {{ + new EventFilter(IllegalActorStateException.class) { + protected Boolean run() { + ActorRef victim = system.actorOf(Props.create(MultipleReceiveActor.class), "victim2"); + victim.tell("Foo", getRef()); + expectMsgEquals("FOO"); + victim.tell("bEcoMe", getRef()); + expectMsgEquals("BECOME"); + victim.tell("Foo", getRef()); + // if it's upper case, then the actor was restarted + expectMsgEquals("FOO"); + return true; + } + }.message("Actor behavior has already been set with receive(...), " + + "use context().become(...) to change it later").occurrences(1).exec(); + }}; + } finally { + system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions))); + } + } + } diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java index c8c68948aa..db286d049d 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/FaultHandlingTest.java @@ -64,12 +64,12 @@ public class FaultHandlingTest { //#strategy - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public Supervisor() { + receive(ReceiveBuilder. match(Props.class, props -> { sender().tell(context().actorOf(props), self()); - }).build(); + }).build() + ); } } @@ -94,12 +94,12 @@ public class FaultHandlingTest { //#strategy2 - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public Supervisor2() { + receive(ReceiveBuilder. match(Props.class, props -> { sender().tell(context().actorOf(props), self()); - }).build(); + }).build() + ); } @Override @@ -115,12 +115,12 @@ public class FaultHandlingTest { public class Child extends AbstractActor { int state = 0; - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public Child() { + receive(ReceiveBuilder. match(Exception.class, exception -> { throw exception; }). match(Integer.class, i -> state = i). - matchEquals("get", s -> sender().tell(state, self())).build(); + matchEquals("get", s -> sender().tell(state, self())).build() + ); } } diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java index ee1d777271..67c22748b8 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java @@ -32,21 +32,21 @@ public class InitializationDocTest { } public static class MessageInitExample extends AbstractActor { - //#messageInit private String initializeMe = null; - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public MessageInitExample() { + //#messageInit + receive(ReceiveBuilder. matchEquals("init", m1 -> { initializeMe = "Up and running"; context().become(ReceiveBuilder. matchEquals("U OK?", m2 -> { sender().tell(initializeMe, self()); }).build()); - }).build(); + }).build() + //#messageInit + ); } - //#messageInit } @Test diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java index b82795f4f3..f77a1fa410 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java @@ -9,8 +9,6 @@ import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.pf.ReceiveBuilder; -import scala.PartialFunction; -import scala.runtime.BoxedUnit; //#imports @@ -18,9 +16,8 @@ import scala.runtime.BoxedUnit; public class MyActor extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(context().system(), this); - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public MyActor() { + receive(ReceiveBuilder. match(String.class, s -> { log.info("Received String message: {}", s); //#my-actor @@ -29,7 +26,8 @@ public class MyActor extends AbstractActor { //#reply //#my-actor }). - matchAny(o -> log.info("received unknown message")).build(); + matchAny(o -> log.info("received unknown message")).build() + ); } } //#my-actor diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/SampleActor.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/SampleActor.java index 8f607331fb..61817c6d95 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/SampleActor.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/SampleActor.java @@ -18,9 +18,8 @@ public class SampleActor extends AbstractActor { context().unbecome(); }).build(); - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public SampleActor() { + receive(ReceiveBuilder. match(Double.class, d -> { sender().tell(d.isNaN() ? 0 : d, self()); }). @@ -30,7 +29,8 @@ public class SampleActor extends AbstractActor { match(String.class, s -> s.startsWith("guard"), s -> { sender().tell("startsWith(guard): " + s.toUpperCase(), self()); context().become(guarded, false); - }).build(); + }).build() + ); } } //#sample-actor diff --git a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java index ab0c4f98a5..1588709d9f 100644 --- a/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/japi/FaultHandlingDocSample.java @@ -73,9 +73,8 @@ public class FaultHandlingDocSample { context().setReceiveTimeout(Duration.create("15 seconds")); } - @Override - public PartialFunction receive() { - return LoggingReceive.create(ReceiveBuilder. + public Listener() { + receive(LoggingReceive.create(ReceiveBuilder. match(Progress.class, progress -> { log().info("Current progress: {} %", progress.percent); if (progress.percent >= 100.0) { @@ -87,7 +86,8 @@ public class FaultHandlingDocSample { // No progress within 15 seconds, ServiceUnavailable log().error("Shutting down due to unavailable service"); context().system().shutdown(); - }).build(), context()); + }).build(), context() + )); } } @@ -137,9 +137,8 @@ public class FaultHandlingDocSample { return strategy; } - @Override - public PartialFunction receive() { - return LoggingReceive.create(ReceiveBuilder. + public Worker() { + receive(LoggingReceive.create(ReceiveBuilder. matchEquals(Start, x -> progressListener == null, x -> { progressListener = sender(); context().system().scheduler().schedule( @@ -160,7 +159,8 @@ public class FaultHandlingDocSample { } }, context().dispatcher()), context().dispatcher()) .to(progressListener); - }).build(), context()); + }).build(), context()) + ); } } @@ -266,9 +266,8 @@ public class FaultHandlingDocSample { storage.tell(new Get(key), self()); } - @Override - public PartialFunction receive() { - return LoggingReceive.create(ReceiveBuilder. + public CounterService() { + receive(LoggingReceive.create(ReceiveBuilder. match(Entry.class, entry -> entry.key.equals(key) && counter == null, entry -> { // Reply from Storage of the initial value, now we can create the Counter final long value = entry.value; @@ -301,7 +300,8 @@ public class FaultHandlingDocSample { matchEquals(Reconnect, o -> { // Re-establish storage after the scheduled delay initStorage(); - }).build(), context()); + }).build(), context()) + ); } void forwardOrPlaceInBacklog(Object msg) { @@ -348,11 +348,8 @@ public class FaultHandlingDocSample { public Counter(String key, long initialValue) { this.key = key; this.count = initialValue; - } - @Override - public PartialFunction receive() { - return LoggingReceive.create(ReceiveBuilder. + receive(LoggingReceive.create(ReceiveBuilder. match(UseStorage.class, useStorage -> { storage = useStorage.storage; storeCount(); @@ -363,7 +360,8 @@ public class FaultHandlingDocSample { }). matchEquals(GetCurrentCount, gcc -> { sender().tell(new CurrentCount(key, count), self()); - }).build(), context()); + }).build(), context()) + ); } void storeCount() { @@ -435,9 +433,8 @@ public class FaultHandlingDocSample { final DummyDB db = DummyDB.instance; - @Override - public PartialFunction receive() { - return LoggingReceive.create(ReceiveBuilder. + public Storage() { + receive(LoggingReceive.create(ReceiveBuilder. match(Store.class, store -> { db.save(store.entry.key, store.entry.value); }). @@ -445,7 +442,8 @@ public class FaultHandlingDocSample { Long value = db.load(get.key); sender().tell(new Entry(get.key, value == null ? Long.valueOf(0L) : value), self()); - }).build(), context()); + }).build(), context()) + ); } } diff --git a/akka-samples/akka-sample-fsm-java-lambda/src/main/java/sample/become/DiningHakkersOnBecome.java b/akka-samples/akka-sample-fsm-java-lambda/src/main/java/sample/become/DiningHakkersOnBecome.java index 873d4655d0..ad2c866603 100644 --- a/akka-samples/akka-sample-fsm-java-lambda/src/main/java/sample/become/DiningHakkersOnBecome.java +++ b/akka-samples/akka-sample-fsm-java-lambda/src/main/java/sample/become/DiningHakkersOnBecome.java @@ -43,8 +43,8 @@ public class DiningHakkersOnBecome { }).build(); //A Chopstick begins its existence as available - public PartialFunction receive() { - return available; + public Chopstick() { + receive(available); } } @@ -125,11 +125,11 @@ public class DiningHakkersOnBecome { }).build(); //All hakkers start in a non-eating state - public PartialFunction receive() { - return ReceiveBuilder.matchEquals(Think, m -> { + public Hakker() { + receive(ReceiveBuilder.matchEquals(Think, m -> { System.out.println(String.format("%s starts to think", name)); startThinking(Duration.create(5, SECONDS)); - }).build(); + }).build()); } private void startThinking(FiniteDuration duration) { diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index 9bed488c8b..a4fff7f432 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -35,24 +35,25 @@ public class LambdaPersistenceDocTest { static Object o1 = new Object() { //#definition class MyProcessor extends AbstractProcessor { - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> { - // message successfully written to journal - Object payload = p.payload(); - Long sequenceNr = p.sequenceNr(); - // ... - }). - match(PersistenceFailure.class, failure -> { - // message failed to be written to journal - Object payload = failure.payload(); - Long sequenceNr = failure.sequenceNr(); - Throwable cause = failure.cause(); - // ... - }). - matchAny(otherwise -> { - // message not written to journal - }).build(); + public MyProcessor() { + receive(ReceiveBuilder. + match(Persistent.class, p -> { + // message successfully written to journal + Object payload = p.payload(); + Long sequenceNr = p.sequenceNr(); + // ... + }). + match(PersistenceFailure.class, failure -> { + // message failed to be written to journal + Object payload = failure.payload(); + Long sequenceNr = failure.sequenceNr(); + Throwable cause = failure.cause(); + // ... + }). + matchAny(otherwise -> { + // message not written to journal + }).build() + ); } } //#definition @@ -67,11 +68,10 @@ public class LambdaPersistenceDocTest { processor.tell(Persistent.create("foo"), null); processor.tell("bar", null); //#usage - } - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, received -> {/* ... */}).build(); + receive(ReceiveBuilder. + match(Persistent.class, received -> {/* ... */}).build() + ); } private void recover() { @@ -124,9 +124,10 @@ public class LambdaPersistenceDocTest { } //#processor-id-override - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, received -> {/* ... */}).build(); + public MyProcessor4() { + receive(ReceiveBuilder. + match(Persistent.class, received -> {/* ... */}).build() + ); } } }; @@ -140,27 +141,27 @@ public class LambdaPersistenceDocTest { public MyProcessor() { this.destination = context().actorOf(Props.create(MyDestination.class)); this.channel = context().actorOf(Channel.props(), "myChannel"); - } - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> { - Persistent out = p.withPayload("done " + p.payload()); - channel.tell(Deliver.create(out, destination.path()), self()); - }).build(); + receive(ReceiveBuilder. + match(Persistent.class, p -> { + Persistent out = p.withPayload("done " + p.payload()); + channel.tell(Deliver.create(out, destination.path()), self()); + }).build() + ); } } class MyDestination extends AbstractActor { - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(ConfirmablePersistent.class, p -> { - Object payload = p.payload(); - Long sequenceNr = p.sequenceNr(); - int redeliveries = p.redeliveries(); - // ... - p.confirm(); - }).build(); + public MyDestination() { + receive(ReceiveBuilder. + match(ConfirmablePersistent.class, p -> { + Object payload = p.payload(); + Long sequenceNr = p.sequenceNr(); + int redeliveries = p.redeliveries(); + // ... + p.confirm(); + }).build() + ); } } //#channel-example @@ -177,39 +178,38 @@ public class LambdaPersistenceDocTest { //#channel-custom-settings context().actorOf( - Channel.props(ChannelSettings.create() - .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS)) - .withRedeliverMax(15))); + Channel.props(ChannelSettings.create() + .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS)) + .withRedeliverMax(15))); //#channel-custom-settings //#channel-custom-listener class MyListener extends AbstractActor { - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(RedeliverFailure.class, r -> { - Iterable messages = r.getMessages(); - // ... - }).build(); + public MyListener() { + receive(ReceiveBuilder. + match(RedeliverFailure.class, r -> { + Iterable messages = r.getMessages(); + // ... + }).build() + ); } } final ActorRef myListener = context().actorOf(Props.create(MyListener.class)); context().actorOf(Channel.props( - ChannelSettings.create().withRedeliverFailureListener(null))); + ChannelSettings.create().withRedeliverFailureListener(null))); //#channel-custom-listener - } + receive(ReceiveBuilder. + match(Persistent.class, p -> { + Persistent out = p.withPayload("done " + p.payload()); + channel.tell(Deliver.create(out, destination.path()), self()); - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> { - Persistent out = p.withPayload("done " + p.payload()); - channel.tell(Deliver.create(out, destination.path()), self()); - - //#channel-example-reply - channel.tell(Deliver.create(out, sender().path()), self()); - //#channel-example-reply - }).build(); + //#channel-example-reply + channel.tell(Deliver.create(out, sender().path()), self()); + //#channel-example-reply + }).build() + ); } } }; @@ -219,18 +219,19 @@ public class LambdaPersistenceDocTest { class MyProcessor extends AbstractProcessor { private Object state; - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(String.class, s -> s.equals("snap"), - s -> saveSnapshot(state)). - match(SaveSnapshotSuccess.class, ss -> { - SnapshotMetadata metadata = ss.metadata(); - // ... - }). - match(SaveSnapshotFailure.class, sf -> { - SnapshotMetadata metadata = sf.metadata(); - // ... - }).build(); + public MyProcessor() { + receive(ReceiveBuilder. + match(String.class, s -> s.equals("snap"), + s -> saveSnapshot(state)). + match(SaveSnapshotSuccess.class, ss -> { + SnapshotMetadata metadata = ss.metadata(); + // ... + }). + match(SaveSnapshotFailure.class, sf -> { + SnapshotMetadata metadata = sf.metadata(); + // ... + }).build() + ); } } //#save-snapshot @@ -241,13 +242,14 @@ public class LambdaPersistenceDocTest { class MyProcessor extends AbstractProcessor { private Object state; - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(SnapshotOffer.class, s -> { - state = s.snapshot(); - // ... - }). - match(Persistent.class, p -> {/* ...*/}).build(); + public MyProcessor() { + receive(ReceiveBuilder. + match(SnapshotOffer.class, s -> { + state = s.snapshot(); + // ... + }). + match(Persistent.class, p -> {/* ...*/}).build() + ); } } //#snapshot-offer @@ -257,17 +259,16 @@ public class LambdaPersistenceDocTest { public MyActor() { processor = context().actorOf(Props.create(MyProcessor.class)); - } - - @Override public PartialFunction receive() { - return ReceiveBuilder.match(Object.class, o -> {/* ... */}).build(); + receive(ReceiveBuilder. + match(Object.class, o -> {/* ... */}).build() + ); } private void recover() { //#snapshot-criteria processor.tell(Recover.create( - SnapshotSelectionCriteria - .create(457L, System.currentTimeMillis())), null); + SnapshotSelectionCriteria + .create(457L, System.currentTimeMillis())), null); //#snapshot-criteria } } @@ -276,12 +277,13 @@ public class LambdaPersistenceDocTest { static Object o6 = new Object() { //#batch-write class MyProcessor extends AbstractProcessor { - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> p.payload().equals("a"), - p -> {/* ... */}). - match(Persistent.class, p -> p.payload().equals("b"), - p -> {/* ... */}).build(); + public MyProcessor() { + receive(ReceiveBuilder. + match(Persistent.class, p -> p.payload().equals("a"), + p -> {/* ... */}). + match(Persistent.class, p -> p.payload().equals("b"), + p -> {/* ... */}).build() + ); } } @@ -291,8 +293,8 @@ public class LambdaPersistenceDocTest { public void batchWrite() { processor.tell(PersistentBatch - .create(asList(Persistent.create("a"), - Persistent.create("b"))), null); + .create(asList(Persistent.create("a"), + Persistent.create("b"))), null); } // ... @@ -307,18 +309,18 @@ public class LambdaPersistenceDocTest { public void foo() { //#persistent-channel-example final ActorRef channel = context().actorOf( - PersistentChannel.props( - PersistentChannelSettings.create() - .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS)) - .withRedeliverMax(15)), - "myPersistentChannel"); + PersistentChannel.props( + PersistentChannelSettings.create() + .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS)) + .withRedeliverMax(15)), + "myPersistentChannel"); channel.tell(Deliver.create(Persistent.create("example"), destination.path()), self()); //#persistent-channel-example //#persistent-channel-watermarks PersistentChannelSettings.create() - .withPendingConfirmationsMax(10000) - .withPendingConfirmationsMin(2000); + .withPendingConfirmationsMax(10000) + .withPendingConfirmationsMin(2000); //#persistent-channel-watermarks //#persistent-channel-reply PersistentChannelSettings.create().withReplyPersistent(true); @@ -343,19 +345,19 @@ public class LambdaPersistenceDocTest { // ... // reliably deliver events channel.tell(Deliver.create( - Persistent.create(event, getCurrentPersistentMessage()), - destination.path()), self()); + Persistent.create(event, getCurrentPersistentMessage()), + destination.path()), self()); } @Override public PartialFunction receiveRecover() { return ReceiveBuilder. - match(String.class, this::handleEvent).build(); + match(String.class, this::handleEvent).build(); } @Override public PartialFunction receiveCommand() { return ReceiveBuilder. - match(String.class, s -> s.equals("cmd"), - s -> persist("evt", this::handleEvent)).build(); + match(String.class, s -> s.equals("cmd"), + s -> persist("evt", this::handleEvent)).build(); } } //#reliable-event-delivery @@ -369,11 +371,12 @@ public class LambdaPersistenceDocTest { return "some-processor-id"; } - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, peristent -> { - // ... - }).build(); + public MyView() { + receive(ReceiveBuilder. + match(Persistent.class, persistent -> { + // ... + }).build() + ); } } //#view diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index 8cb5e4de43..3999650804 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -37,16 +37,17 @@ public class LambdaPersistencePluginDocTest { selection.tell(new Identify(1), self()); } - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(ActorIdentity.class, ai -> { - if (ai.correlationId().equals(1)) { - ActorRef store = ai.getRef(); - if (store != null) { - SharedLeveldbJournal.setStore(store, context().system()); - } + public SharedStorageUsage() { + receive(ReceiveBuilder. + match(ActorIdentity.class, ai -> { + if (ai.correlationId().equals(1)) { + ActorRef store = ai.getRef(); + if (store != null) { + SharedLeveldbJournal.setStore(store, context().system()); } - }).build(); + } + }).build() + ); } } //#shared-store-usage diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ConversationRecoveryExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ConversationRecoveryExample.java index 50e59c641b..7a748f9374 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ConversationRecoveryExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ConversationRecoveryExample.java @@ -13,62 +13,64 @@ import scala.PartialFunction; import scala.runtime.BoxedUnit; public class ConversationRecoveryExample { - public static String PING = "PING"; - public static String PONG = "PONG"; + public static String PING = "PING"; + public static String PONG = "PONG"; - public static class Ping extends AbstractProcessor { - final ActorRef pongChannel = context().actorOf(Channel.props(), "pongChannel"); - int counter = 0; + public static class Ping extends AbstractProcessor { + final ActorRef pongChannel = context().actorOf(Channel.props(), "pongChannel"); + int counter = 0; - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(ConfirmablePersistent.class, cp -> cp.payload().equals(PING), cp -> { - counter += 1; - System.out.println(String.format("received ping %d times", counter)); - cp.confirm(); - if (!recoveryRunning()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - pongChannel.tell(Deliver.create(cp.withPayload(PONG), sender().path()), self()); - }). - match(String.class, - s -> s.equals("init"), - s -> pongChannel.tell(Deliver.create(Persistent.create(PONG), sender().path()), self())).build(); - } + public Ping() { + receive(ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> cp.payload().equals(PING), cp -> { + counter += 1; + System.out.println(String.format("received ping %d times", counter)); + cp.confirm(); + if (!recoveryRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + pongChannel.tell(Deliver.create(cp.withPayload(PONG), sender().path()), self()); + }). + match(String.class, + s -> s.equals("init"), + s -> pongChannel.tell(Deliver.create(Persistent.create(PONG), sender().path()), self())).build() + ); } + } - public static class Pong extends AbstractProcessor { - private final ActorRef pingChannel = context().actorOf(Channel.props(), "pingChannel"); - private int counter = 0; + public static class Pong extends AbstractProcessor { + private final ActorRef pingChannel = context().actorOf(Channel.props(), "pingChannel"); + private int counter = 0; - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(ConfirmablePersistent.class, cp -> cp.payload().equals(PONG), cp -> { - counter += 1; - System.out.println(String.format("received pong %d times", counter)); - cp.confirm(); - if (!recoveryRunning()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - pingChannel.tell(Deliver.create(cp.withPayload(PING), sender().path()), self()); - }).build(); - } + public Pong() { + receive(ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> cp.payload().equals(PONG), cp -> { + counter += 1; + System.out.println(String.format("received pong %d times", counter)); + cp.confirm(); + if (!recoveryRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + pingChannel.tell(Deliver.create(cp.withPayload(PING), sender().path()), self()); + }).build() + ); } + } - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); - final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping"); - final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong"); + final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping"); + final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong"); - ping.tell("init", pong); - } + ping.tell("init", pong); + } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java index 96351c3c14..a903e30447 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java @@ -80,13 +80,15 @@ class ExampleProcessor extends AbstractEventsourcedProcessor { return state.size(); } - @Override public PartialFunction receiveRecover() { + @Override + public PartialFunction receiveRecover() { return ReceiveBuilder. match(Evt.class, state::update). match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()).build(); } - @Override public PartialFunction receiveCommand() { + @Override + public PartialFunction receiveCommand() { return ReceiveBuilder.match(Cmd.class, c -> { final String data = c.getData(); final Evt evt1 = new Evt(data + "-" + getNumEvents()); diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java index 1982413125..2acffd299f 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java @@ -14,45 +14,45 @@ import scala.PartialFunction; import scala.runtime.BoxedUnit; public class ProcessorChannelExample { - public static class ExampleProcessor extends AbstractProcessor { - private ActorRef destination; - private ActorRef channel; + public static class ExampleProcessor extends AbstractProcessor { + private ActorRef destination; + private ActorRef channel; - public ExampleProcessor(ActorRef destination) { - this.destination = destination; - this.channel = context().actorOf(Channel.props(), "channel"); - } + public ExampleProcessor(ActorRef destination) { + this.destination = destination; + this.channel = context().actorOf(Channel.props(), "channel"); - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> { - System.out.println("processed " + p.payload()); - channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self()); - }). - match(String.class, s -> System.out.println("reply = " + s)).build(); - } + receive(ReceiveBuilder. + match(Persistent.class, p -> { + System.out.println("processed " + p.payload()); + channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self()); + }). + match(String.class, s -> System.out.println("reply = " + s)).build() + ); } + } - public static class ExampleDestination extends AbstractActor { - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(ConfirmablePersistent.class, cp -> { - System.out.println("received " + cp.payload()); - sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null); - cp.confirm(); - }).build(); - } + public static class ExampleDestination extends AbstractActor { + public ExampleDestination() { + receive(ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> { + System.out.println("received " + cp.payload()); + sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null); + cp.confirm(); + }).build() + ); } + } - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); - final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class)); - final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1"); + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class)); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1"); - processor.tell(Persistent.create("a"), null); - processor.tell(Persistent.create("b"), null); + processor.tell(Persistent.create("a"), null); + processor.tell(Persistent.create("b"), null); - Thread.sleep(1000); - system.shutdown(); - } + Thread.sleep(1000); + system.shutdown(); + } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java index 9b40a6f479..79bf884046 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java @@ -17,56 +17,57 @@ import scala.runtime.BoxedUnit; import java.util.ArrayList; public class ProcessorFailureExample { - public static class ExampleProcessor extends AbstractProcessor { - private ArrayList received = new ArrayList(); + public static class ExampleProcessor extends AbstractProcessor { + private ArrayList received = new ArrayList(); - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> p.payload().equals("boom"), p -> {throw new RuntimeException("boom");}). - match(Persistent.class, p -> !p.payload().equals("boom"), p -> received.add(p.payload())). - match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}). - match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).build(); - } - - @Override - public void preRestart(Throwable reason, Option message) { - if (message.isDefined() && message.get() instanceof Persistent) { - deleteMessage(((Persistent) message.get()).sequenceNr(), false); - } - super.preRestart(reason, message); - } + public ExampleProcessor() { + receive(ReceiveBuilder. + match(Persistent.class, p -> p.payload().equals("boom"), p -> {throw new RuntimeException("boom");}). + match(Persistent.class, p -> !p.payload().equals("boom"), p -> received.add(p.payload())). + match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}). + match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).build() + ); } - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); - final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2"); - - processor.tell(Persistent.create("a"), null); - processor.tell("print", null); - processor.tell("boom", null); - processor.tell("print", null); - processor.tell(Persistent.create("b"), null); - processor.tell("print", null); - processor.tell(Persistent.create("boom"), null); - processor.tell("print", null); - processor.tell(Persistent.create("c"), null); - processor.tell("print", null); - - // Will print in a first run (i.e. with empty journal): - - // received [a] - // received [a, b] - // received [a, b, c] - - // Will print in a second run: - - // received [a, b, c, a] - // received [a, b, c, a, b] - // received [a, b, c, a, b, c] - - // etc ... - - Thread.sleep(1000); - system.shutdown(); + @Override + public void preRestart(Throwable reason, Option message) { + if (message.isDefined() && message.get() instanceof Persistent) { + deleteMessage(((Persistent) message.get()).sequenceNr(), false); + } + super.preRestart(reason, message); } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2"); + + processor.tell(Persistent.create("a"), null); + processor.tell("print", null); + processor.tell("boom", null); + processor.tell("print", null); + processor.tell(Persistent.create("b"), null); + processor.tell("print", null); + processor.tell(Persistent.create("boom"), null); + processor.tell("print", null); + processor.tell(Persistent.create("c"), null); + processor.tell("print", null); + + // Will print in a first run (i.e. with empty journal): + + // received [a] + // received [a, b] + // received [a, b, c] + + // Will print in a second run: + + // received [a, b, c, a] + // received [a, b, c, a, b] + // received [a, b, c, a, b, c] + + // etc ... + + Thread.sleep(1000); + system.shutdown(); + } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java index e62cc0bdd4..fda5598676 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java @@ -18,62 +18,63 @@ import java.io.Serializable; import java.util.ArrayList; public class SnapshotExample { - public static class ExampleState implements Serializable { - private final ArrayList received; + public static class ExampleState implements Serializable { + private final ArrayList received; - public ExampleState() { - this(new ArrayList()); - } - - public ExampleState(ArrayList received) { - this.received = received; - } - - public ExampleState copy() { - return new ExampleState(new ArrayList(received)); - } - - public void update(String s) { - received.add(s); - } - - @Override - public String toString() { - return received.toString(); - } + public ExampleState() { + this(new ArrayList()); } - public static class ExampleProcessor extends AbstractProcessor { - private ExampleState state = new ExampleState(); - - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> state.update(String.format("%s-%d", p.payload(), p.sequenceNr()))). - match(SnapshotOffer.class, s -> { - ExampleState exState = (ExampleState) s.snapshot(); - System.out.println("offered state = " + exState); - state = exState; - }). - match(String.class, s -> s.equals("print"), s -> System.out.println("current state = " + state)). - match(String.class, s -> s.equals("snap"), s -> - // IMPORTANT: create a copy of snapshot - // because ExampleState is mutable !!! - saveSnapshot(state.copy())).build(); - } + public ExampleState(ArrayList received) { + this.received = received; } - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); - final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java"); - - processor.tell(Persistent.create("a"), null); - processor.tell(Persistent.create("b"), null); - processor.tell("snap", null); - processor.tell(Persistent.create("c"), null); - processor.tell(Persistent.create("d"), null); - processor.tell("print", null); - - Thread.sleep(1000); - system.shutdown(); + public ExampleState copy() { + return new ExampleState(new ArrayList(received)); } + + public void update(String s) { + received.add(s); + } + + @Override + public String toString() { + return received.toString(); + } + } + + public static class ExampleProcessor extends AbstractProcessor { + private ExampleState state = new ExampleState(); + + public ExampleProcessor() { + receive(ReceiveBuilder. + match(Persistent.class, p -> state.update(String.format("%s-%d", p.payload(), p.sequenceNr()))). + match(SnapshotOffer.class, s -> { + ExampleState exState = (ExampleState) s.snapshot(); + System.out.println("offered state = " + exState); + state = exState; + }). + match(String.class, s -> s.equals("print"), s -> System.out.println("current state = " + state)). + match(String.class, s -> s.equals("snap"), s -> + // IMPORTANT: create a copy of snapshot + // because ExampleState is mutable !!! + saveSnapshot(state.copy())).build() + ); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java"); + + processor.tell(Persistent.create("a"), null); + processor.tell(Persistent.create("b"), null); + processor.tell("snap", null); + processor.tell(Persistent.create("c"), null); + processor.tell(Persistent.create("d"), null); + processor.tell("print", null); + + Thread.sleep(1000); + system.shutdown(); + } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java index eb28d94063..7344f5efb5 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java @@ -17,84 +17,87 @@ import scala.runtime.BoxedUnit; import java.util.concurrent.TimeUnit; public class ViewExample { - public static class ExampleProcessor extends AbstractProcessor { - @Override - public String processorId() { - return "processor-5"; - } - - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, - p -> System.out.println(String.format("processor received %s (sequence nr = %d)", - p.payload(), - p.sequenceNr()))).build(); - } + public static class ExampleProcessor extends AbstractProcessor { + @Override + public String processorId() { + return "processor-5"; } - public static class ExampleView extends AbstractView { - private final ActorRef destination = context().actorOf(Props.create(ExampleDestination.class)); - private final ActorRef channel = context().actorOf(Channel.props("channel")); + public ExampleProcessor() { + receive(ReceiveBuilder. + match(Persistent.class, + p -> System.out.println(String.format("processor received %s (sequence nr = %d)", + p.payload(), + p.sequenceNr()))).build() + ); + } + } - private int numReplicated = 0; + public static class ExampleView extends AbstractView { + private final ActorRef destination = context().actorOf(Props.create(ExampleDestination.class)); + private final ActorRef channel = context().actorOf(Channel.props("channel")); - @Override - public String viewId() { - return "view-5"; - } + private int numReplicated = 0; - @Override - public String processorId() { - return "processor-5"; - } - - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(Persistent.class, p -> { - numReplicated += 1; - System.out.println(String.format("view received %s (sequence nr = %d, num replicated = %d)", - p.payload(), - p.sequenceNr(), - numReplicated)); - channel.tell(Deliver.create(p.withPayload("replicated-" + p.payload()), destination.path()), - self()); - }). - match(SnapshotOffer.class, so -> { - numReplicated = (Integer) so.snapshot(); - System.out.println(String.format("view received snapshot offer %s (metadata = %s)", - numReplicated, - so.metadata())); - }). - match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build(); - } + @Override + public String viewId() { + return "view-5"; } - public static class ExampleDestination extends AbstractActor { - - @Override public PartialFunction receive() { - return ReceiveBuilder. - match(ConfirmablePersistent.class, cp -> { - System.out.println(String.format("destination received %s (sequence nr = %s)", - cp.payload(), - cp.sequenceNr())); - cp.confirm(); - }).build(); - } + @Override + public String processorId() { + return "processor-5"; } - public static void main(String... args) throws Exception { - final ActorSystem system = ActorSystem.create("example"); - final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class)); - final ActorRef view = system.actorOf(Props.create(ExampleView.class)); - - system.scheduler() - .schedule(Duration.Zero(), - Duration.create(2, TimeUnit.SECONDS), - processor, - Persistent.create("scheduled"), - system.dispatcher(), - null); - system.scheduler() - .schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null); + public ExampleView() { + receive(ReceiveBuilder. + match(Persistent.class, p -> { + numReplicated += 1; + System.out.println(String.format("view received %s (sequence nr = %d, num replicated = %d)", + p.payload(), + p.sequenceNr(), + numReplicated)); + channel.tell(Deliver.create(p.withPayload("replicated-" + p.payload()), destination.path()), + self()); + }). + match(SnapshotOffer.class, so -> { + numReplicated = (Integer) so.snapshot(); + System.out.println(String.format("view received snapshot offer %s (metadata = %s)", + numReplicated, + so.metadata())); + }). + match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build() + ); } + } + + public static class ExampleDestination extends AbstractActor { + + public ExampleDestination() { + receive(ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> { + System.out.println(String.format("destination received %s (sequence nr = %s)", + cp.payload(), + cp.sequenceNr())); + cp.confirm(); + }).build() + ); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class)); + final ActorRef view = system.actorOf(Props.create(ExampleView.class)); + + system.scheduler() + .schedule(Duration.Zero(), + Duration.create(2, TimeUnit.SECONDS), + processor, + Persistent.create("scheduled"), + system.dispatcher(), + null); + system.scheduler() + .schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null); + } } diff --git a/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/ArithmeticService.java b/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/ArithmeticService.java index 9e982076e3..bbfc7d2d9d 100644 --- a/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/ArithmeticService.java +++ b/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/ArithmeticService.java @@ -61,15 +61,15 @@ class ArithmeticService extends AbstractLoggingActor { } } - @Override - public PartialFunction receive() { - return ReceiveBuilder. + ArithmeticService() { + receive(ReceiveBuilder. match(Expression.class, expr -> { // We delegate the dangerous task of calculation to a worker, passing the // expression as a constructor argument to the actor. ActorRef worker = context().actorOf(FlakyExpressionCalculator.props(expr, Left)); pendingWorkers.put(worker, sender()); }). - match(Result.class, r -> notifyConsumerSuccess(sender(), r.getValue())).build(); + match(Result.class, r -> notifyConsumerSuccess(sender(), r.getValue())).build() + ); } -} \ No newline at end of file +} diff --git a/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/FlakyExpressionCalculator.java b/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/FlakyExpressionCalculator.java index 3afd7201a7..4efb4361c8 100644 --- a/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/FlakyExpressionCalculator.java +++ b/akka-samples/akka-sample-supervision-java-lambda/src/main/java/supervision/FlakyExpressionCalculator.java @@ -67,11 +67,6 @@ public class FlakyExpressionCalculator extends AbstractLoggingActor { private final Expression expr; private final Position myPosition; - public FlakyExpressionCalculator(Expression expr, Position myPosition) { - this.expr = expr; - this.myPosition = myPosition; - } - private Expression getExpr() { return expr; } @@ -107,9 +102,11 @@ public class FlakyExpressionCalculator extends AbstractLoggingActor { } } - @Override - public PartialFunction receive() { - return ReceiveBuilder. + public FlakyExpressionCalculator(Expression expr, Position myPosition) { + this.expr = expr; + this.myPosition = myPosition; + + receive(ReceiveBuilder. match(Result.class, r -> expected.contains(r.getPosition()), r -> { expected.remove(r.getPosition()); results.put(r.getPosition(), r.getValue()); @@ -126,7 +123,7 @@ public class FlakyExpressionCalculator extends AbstractLoggingActor { throw new IllegalStateException("Expected results for positions " + expected.stream().map(Object::toString).collect(Collectors.joining(", ")) + " but got position " + r.getPosition()); - }).build(); + }).build()); } private Integer evaluate(Expression expr, Integer left, Integer right) {