Merge pull request #2094 from akka/wip-3940-add-sugar-to-abstract-actor-master-ban

+act,sam,doc #3940 Added receive setter for Java Lambda actors
This commit is contained in:
Björn Antonsson 2014-03-22 17:27:59 +01:00
commit 8f7faf7898
23 changed files with 683 additions and 570 deletions

View file

@ -14,8 +14,8 @@ package akka.japi.pf;
* Example:
* <pre>
* @Override
* public PartialFunction<Object, BoxedUnit> receive() {
* return ReceiveBuilder.
* public Actor() {
* receive(ReceiveBuilder.
* match(Double.class, d -> {
* sender().tell(d.isNaN() ? 0 : d, self());
* }).
@ -24,7 +24,8 @@ package akka.japi.pf;
* }).
* match(String.class, s -> s.startsWith("foo"), s -> {
* sender().tell(s.toUpperCase(), self());
* }).build();
* }).build()
* );
* }
* </pre>
*

View file

@ -25,9 +25,9 @@ object AbstractActor {
* <pre>
* public class MyActor extends AbstractActor {
* int count = 0;
* @Override
* public PartialFunction<Object, BoxedUnit> receive() {
* return ReceiveBuilder.
*
* public MyActor() {
* receive(ReceiveBuilder.
* match(Double.class, d -> {
* sender().tell(d.isNaN() ? 0 : d, self());
* }).
@ -36,7 +36,8 @@ object AbstractActor {
* }).
* match(String.class, s -> s.startsWith("foo"), s -> {
* sender().tell(s.toUpperCase(), self());
* }).build();
* }).build()
* );
* }
* }
* </pre>
@ -44,12 +45,30 @@ object AbstractActor {
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
abstract class AbstractActor extends Actor {
private var _receive: Receive = null
/**
* Set up the initial receive behavior of the Actor.
*
* @param receive The receive behavior.
*/
@throws(classOf[IllegalActorStateException])
protected def receive(receive: Receive): Unit =
if (_receive == null) _receive = receive
else throw IllegalActorStateException("Actor behavior has already been set with receive(...), " +
"use context().become(...) to change it later")
/**
* Returns this AbstractActor's AbstractActorContext
* The AbstractActorContext is not thread safe so do not expose it outside of the
* AbstractActor.
*/
def getContext(): AbstractActorContext = context.asInstanceOf[AbstractActorContext]
override def receive =
if (_receive != null) _receive
else throw IllegalActorStateException("Actor behavior has not been set with receive(...)")
}
/**
@ -73,9 +92,9 @@ abstract class AbstractLoggingActor extends AbstractActor with ActorLogging
* <pre>
* public class MyActorWithStash extends AbstractActorWithStash {
* int count = 0;
* @Override
* public PartialFunction<Object, BoxedUnit> receive() {
* return ReceiveBuilder.match(String.class, s -> {
*
* public MyActorWithStash() {
* receive(ReceiveBuilder. match(String.class, s -> {
* if (count < 0) {
* sender().tell(new Integer(s.length()), self());
* } else if (count == 2) {
@ -84,8 +103,8 @@ abstract class AbstractLoggingActor extends AbstractActor with ActorLogging
* } else {
* count += 1;
* stash();
* }
* }).build();
* }}).build()
* );
* }
* }
* </pre>

View file

@ -38,11 +38,13 @@ Creating Actors
Defining an Actor class
-----------------------
Actor classes are implemented by extending the :class:`AbstractActor` class and implementing
the :meth:`receive` method. The :meth:`receive` method should define a series of match
statements (which has the type ``PartialFunction<Object, BoxedUnit>``) that defines
which messages your Actor can handle, along with the implementation of how the
messages should be processed.
Actor classes are implemented by extending the :class:`AbstractActor` class and setting
the “initial behavior” in the constructor by calling the :meth:`receive` method in
the :class:`AbstractActor`.
The argument to the :meth:`receive` method is a ``PartialFunction<Object,BoxedUnit>``
that defines which messages your Actor can handle, along with the implementation of
how the messages should be processed.
Don't let the type signature scare you. To allow you to easily build up a partial
function there is a builder named ``ReceiveBuilder`` that you can use.
@ -64,7 +66,7 @@ Note further that the return type of the behavior defined above is ``Unit``; if
the actor shall reply to the received message then this must be done explicitly
as explained below.
The result of the :meth:`receive` method is a partial function object, which is
The argument to the :meth:`receive` method is a partial function object, which is
stored within the actor as its “initial behavior”, see `Become/Unbecome`_ for
further information on changing the behavior of an actor after its
construction.
@ -218,8 +220,8 @@ last line. Watching an actor is quite simple as well:
Actor API
=========
The :class:`AbstractActor` class defines only one abstract method, the above mentioned
:meth:`receive`, which implements the behavior of the actor.
The :class:`AbstractActor` class defines a method called :meth:`receive`,
that is used to set the “initial behavior” of the actor.
If the current actor behavior does not match a received message,
:meth:`unhandled` is called, which by default publishes an
@ -592,13 +594,21 @@ routers, load-balancers, replicators etc.
Receive messages
================
An Actor has to implement the ``receive`` method to receive messages:
An Actor either has to set its initial receive behavior in the constructor by
calling the :meth:`receive` method in the :class:`AbstractActor`:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java
:include: receive-constructor
:exclude: and-some-behavior
or by implementing the :meth:`receive` method in the :class:`Actor` interface:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#receive
The :meth:`receive` method should define a series of match statements (which has the type
``PartialFunction<Object, BoxedUnit>``) that defines which messages your Actor can handle,
along with the implementation of how the messages should be processed.
Both the argument to the :class:`AbstractActor` :meth:`receive` method and the return
type of the :class:`Actor` :meth:`receive` method is a ``PartialFunction<Object, BoxedUnit>``
that defines which messages your Actor can handle, along with the implementation of how the messages
should be processed.
Don't let the type signature scare you. To allow you to easily build up a partial
function there is a builder named ``ReceiveBuilder`` that you can use.
@ -765,9 +775,6 @@ behavior is not the default).
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#swapper
Stash
=====
Stash
=====

View file

@ -41,7 +41,7 @@ Architecture
============
* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal
before its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed
before its behavior is called. When a processor is started or restarted, journaled messages are replayed
to that processor, so that it can recover internal state from these messages.
* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
@ -71,13 +71,13 @@ Architecture
Processors
==========
A processor can be implemented by extending ``AbstractProcessor`` class and implementing the
``receive`` method.
A processor can be implemented by extending ``AbstractProcessor`` class and setting
the “initial behavior” in the constructor by calling the :meth:`receive` method
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#definition
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message
When a processor's behavior is called with a ``Persistent`` message it can safely assume that this message
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
is stopped, by default. If a processor should continue running on persistence failures it must handle
``PersistenceFailure`` messages. In this case, a processor may want to inform the sender about the failure,
@ -164,9 +164,8 @@ Overriding ``processorId`` is the recommended way to generate stable identifiers
Views
=====
Views can be implemented by extending the ``AbstractView`` abstract class and implementing the ``receive`` and the
``processorId``
methods.
Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``processorId`` method
and setting the “initial behavior” in the constructor by calling the :meth:`receive` method.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view

View file

@ -222,7 +222,7 @@ private[persistence] trait Eventsourced extends Processor {
* An event sourced processor.
*/
trait EventsourcedProcessor extends Processor with Eventsourced {
final def receive = receiveCommand
def receive = receiveCommand
}
/**
@ -344,4 +344,10 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou
*/
final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persist(Util.immutableSeq(events))(event handler(event))
override def receive = super[EventsourcedProcessor].receive
override def receive(receive: Receive): Unit = {
throw new IllegalArgumentException("Define the behavior by overriding receiveRecover and receiveCommand")
}
}

View file

@ -388,28 +388,23 @@ final case class RecoveryException(message: String, cause: Throwable) extends Ak
abstract class UntypedProcessor extends UntypedActor with Processor
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]).
* Java API: compatible with lambda expressions
*
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types
* are not persisted.
*
* {{{
* import akka.persistence.AbstractProcessor;
* import akka.persistence.Persistent;
* import akka.actor.ActorRef;
* import akka.actor.Props;
* import akka.japi.pf.ReceiveBuilder;
* import scala.PartialFunction;
* import scala.runtime.BoxedUnit;
*
* <p/>
* Example:
* <pre>
* class MyProcessor extends AbstractProcessor {
* public PartialFunction<Object, BoxedUnit> receive() {
* return ReceiveBuilder.
* match(Persistent.class, p -> {
* Object payload = p.payload();
* Long sequenceNr = p.sequenceNr();
* public MyProcessor() {
* receive(ReceiveBuilder.
* match(Persistent.class, p -> {
* Object payload = p.payload();
* Long sequenceNr = p.sequenceNr();
* // ...
* }).build();
* }
* }).build()
* );
* }
* }
*
* // ...
@ -418,7 +413,7 @@ abstract class UntypedProcessor extends UntypedActor with Processor
*
* processor.tell(Persistent.create("foo"), null);
* processor.tell("bar", null);
* }}}
* </pre>
*
* During start and restart, persistent messages are replayed to a processor so that it can recover internal
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed

View file

@ -8,10 +8,16 @@ import akka.actor.*;
import akka.event.LoggingAdapter;
import akka.event.Logging;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actor.Messages.Swap.Swap;
import static docs.actor.Messages.*;
import static akka.japi.Util.immutableSeq;
import java.util.concurrent.TimeUnit;
@ -43,11 +49,19 @@ import static akka.pattern.Patterns.gracefulStop;
public class ActorDocTest {
public static Config config = ConfigFactory.parseString(
"akka {\n" +
" loggers = [\"akka.testkit.TestEventListener\"]\n" +
" loglevel = \"WARNING\"\n" +
" stdout-loglevel = \"WARNING\"\n" +
"}\n"
);
static ActorSystem system = null;
@BeforeClass
public static void beforeClass() {
system = ActorSystem.create("ActorDocTest");
system = ActorSystem.create("ActorDocTest", config);
}
@AfterClass
@ -61,18 +75,27 @@ public class ActorDocTest {
public class FirstActor extends AbstractActor {
final ActorRef child = context().actorOf(Props.create(MyActor.class), "myChild");
//#plus-some-behavior
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public FirstActor() {
receive(ReceiveBuilder.
matchAny(x -> {
sender().tell(x, self());
}).build();
}).build()
);
}
//#plus-some-behavior
}
//#context-actorOf
static public abstract class ReceiveActor extends AbstractActor {
static public abstract class SomeActor extends AbstractActor {
//#receive-constructor
public SomeActor() {
receive(ReceiveBuilder.
//#and-some-behavior
match(String.class, s -> System.out.println(s.toLowerCase())).
//#and-some-behavior
build());
}
//#receive-constructor
@Override
//#receive
public abstract PartialFunction<Object, BoxedUnit> receive();
@ -84,12 +107,9 @@ public class ActorDocTest {
ActorWithArgs(String args) {
this.args = args;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchAny(x -> { }).build();
receive(ReceiveBuilder.
matchAny(x -> { }).build()
);
}
}
@ -112,14 +132,11 @@ public class ActorDocTest {
DemoActor(Integer magicNumber) {
this.magicNumber = magicNumber;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
match(Integer.class, i -> {
sender().tell(i + magicNumber, self());
}).build();
}).build()
);
}
}
@ -131,9 +148,8 @@ public class ActorDocTest {
ActorRef demoActor = context().actorOf(DemoActor.props(42), "demo");
// ...
//#props-factory
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return emptyBehavior();
public SomeOtherActor() {
receive(emptyBehavior());
}
//#props-factory
}
@ -141,15 +157,15 @@ public class ActorDocTest {
public static class Hook extends AbstractActor {
ActorRef target = null;
public Hook() {
receive(emptyBehavior());
}
//#preStart
@Override
public void preStart() {
target = context().actorOf(Props.create(MyActor.class, "target"));
}
//#preStart
public PartialFunction<Object, BoxedUnit> receive() {
return emptyBehavior();
}
//#postStop
@Override
public void postStop() {
@ -191,9 +207,8 @@ public class ActorDocTest {
}
public static class ReplyException extends AbstractActor {
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public ReplyException() {
receive(ReceiveBuilder.
matchAny(x -> {
//#reply-exception
try {
@ -204,7 +219,8 @@ public class ActorDocTest {
throw e;
}
//#reply-exception
}).build();
}).build()
);
}
private String operation() {
@ -223,16 +239,16 @@ public class ActorDocTest {
private ActorRef worker =
context().watch(context().actorOf(Props.create(Cruncher.class), "worker"));
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Manager() {
receive(ReceiveBuilder.
matchEquals("job", s -> {
worker.tell("crunch", self());
}).
matchEquals(SHUTDOWN, x -> {
worker.tell(PoisonPill.getInstance(), self());
context().become(shuttingDown);
}).build();
}).build()
);
}
public PartialFunction<Object, BoxedUnit> shuttingDown =
@ -263,29 +279,27 @@ public class ActorDocTest {
public static class Cruncher extends AbstractActor {
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("crunch", s -> { }).build();
public Cruncher() {
receive(ReceiveBuilder.
matchEquals("crunch", s -> { }).build()
);
}
}
static
//#swapper
public class Swapper extends AbstractActor {
final LoggingAdapter log = Logging.getLogger(context().system(), this);
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public class Swapper extends AbstractLoggingActor {
public Swapper() {
receive(ReceiveBuilder.
matchEquals(Swap, s -> {
log.info("Hi");
log().info("Hi");
context().become(ReceiveBuilder.
matchEquals(Swap, x -> {
log.info("Ho");
log().info("Ho");
context().unbecome(); // resets the latest 'become' (just for fun)
}).build(), false); // push on top instead of replace
}).build();
}).build()
);
}
}
@ -312,7 +326,7 @@ public class ActorDocTest {
public void creatingActorWithSystemActorOf() {
//#system-actorOf
// ActorSystem is a heavy object: create only one per application
final ActorSystem system = ActorSystem.create("MySystem");
final ActorSystem system = ActorSystem.create("MySystem", config);
final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor");
//#system-actorOf
try {
@ -357,17 +371,14 @@ public class ActorDocTest {
static
//#receive-timeout
public class ReceiveTimeoutActor extends AbstractActor {
public ReceiveTimeoutActor() {
// To set an initial delay
context().setReceiveTimeout(Duration.create("10 seconds"));
}
//#receive-timeout
ActorRef target = context().system().deadLetters();
//#receive-timeout
public ReceiveTimeoutActor() {
// To set an initial delay
context().setReceiveTimeout(Duration.create("10 seconds"));
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
matchEquals("Hello", s -> {
// To set in a response to a message
context().setReceiveTimeout(Duration.create("1 second"));
@ -382,7 +393,8 @@ public class ActorDocTest {
//#receive-timeout
target.tell("timeout", self());
//#receive-timeout
}).build();
}).build()
);
}
}
//#receive-timeout
@ -405,7 +417,7 @@ public class ActorDocTest {
private PartialFunction<Object, BoxedUnit> angry;
private PartialFunction<Object, BoxedUnit> happy;
{
public HotSwapActor() {
angry =
ReceiveBuilder.
matchEquals("foo", s -> {
@ -422,16 +434,15 @@ public class ActorDocTest {
matchEquals("foo", s -> {
context().become(angry);
}).build();
}
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
matchEquals("foo", s -> {
context().become(angry);
}).
matchEquals("bar", s -> {
context().become(happy);
}).build();
}).build()
);
}
}
//#hot-swap-actor
@ -459,8 +470,8 @@ public class ActorDocTest {
static
//#stash
public class ActorWithProtocol extends AbstractActorWithStash {
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public ActorWithProtocol() {
receive(ReceiveBuilder.
matchEquals("open", s -> {
context().become(ReceiveBuilder.
matchEquals("write", ws -> { /* do writing */ }).
@ -470,7 +481,8 @@ public class ActorDocTest {
}).
matchAny(msg -> stash()).build(), false);
}).
matchAny(msg -> stash()).build();
matchAny(msg -> stash()).build()
);
}
}
//#stash
@ -484,20 +496,20 @@ public class ActorDocTest {
//#watch
public class WatchActor extends AbstractActor {
private final ActorRef child = context().actorOf(Props.empty(), "target");
{
context().watch(child); // <-- this is the only call needed for registration
}
private ActorRef lastSender = system.deadLetters();
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public WatchActor() {
context().watch(child); // <-- this is the only call needed for registration
receive(ReceiveBuilder.
matchEquals("kill", s -> {
context().stop(child);
lastSender = sender();
}).
match(Terminated.class, t -> t.actor().equals(child), t -> {
lastSender.tell("finished", self());
}).build();
}).build()
);
}
}
//#watch
@ -519,13 +531,11 @@ public class ActorDocTest {
public class Follower extends AbstractActor {
final Integer identifyId = 1;
{
public Follower(){
ActorSelection selection = context().actorSelection("/user/another");
selection.tell(new Identify(identifyId), self());
}
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
receive(ReceiveBuilder.
match(ActorIdentity.class, id -> id.getRef() != null, id -> {
ActorRef ref = id.getRef();
context().watch(ref);
@ -533,7 +543,8 @@ public class ActorDocTest {
}).
match(ActorIdentity.class, id -> id.getRef() == null, id -> {
context().stop(self());
}).build();
}).build()
);
}
final PartialFunction<Object, BoxedUnit> active(final ActorRef another) {
@ -558,4 +569,73 @@ public class ActorDocTest {
}
};
}
public static class NoReceiveActor extends AbstractActor {
}
@Test
public void noReceiveActor() {
EventFilter ex1 = new ErrorFilter(ActorInitializationException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
final ActorRef victim = new EventFilter<ActorRef>(ActorInitializationException.class) {
protected ActorRef run() {
return system.actorOf(Props.create(NoReceiveActor.class), "victim");
}
}.message("Actor behavior has not been set with receive(...)").occurrences(1).exec();
assertEquals(true, victim.isTerminated());
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
}
public static class MultipleReceiveActor extends AbstractActor {
public MultipleReceiveActor() {
receive(ReceiveBuilder.
match(String.class, s1 -> s1.toLowerCase().equals("become"), s1 -> {
sender().tell(s1.toUpperCase(), self());
receive(ReceiveBuilder.
match(String.class, s2 -> {
sender().tell(s2.toLowerCase(), self());
}).build()
);
}).
match(String.class, s1 -> {
sender().tell(s1.toUpperCase(), self());
}).build()
);
}
}
@Test
public void multipleReceiveActor() {
EventFilter ex1 = new ErrorFilter(IllegalActorStateException.class);
EventFilter[] ignoreExceptions = { ex1 };
try {
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
new JavaTestKit(system) {{
new EventFilter<Boolean>(IllegalActorStateException.class) {
protected Boolean run() {
ActorRef victim = system.actorOf(Props.create(MultipleReceiveActor.class), "victim2");
victim.tell("Foo", getRef());
expectMsgEquals("FOO");
victim.tell("bEcoMe", getRef());
expectMsgEquals("BECOME");
victim.tell("Foo", getRef());
// if it's upper case, then the actor was restarted
expectMsgEquals("FOO");
return true;
}
}.message("Actor behavior has already been set with receive(...), " +
"use context().become(...) to change it later").occurrences(1).exec();
}};
} finally {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
}
}

View file

@ -64,12 +64,12 @@ public class FaultHandlingTest {
//#strategy
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Supervisor() {
receive(ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build();
}).build()
);
}
}
@ -94,12 +94,12 @@ public class FaultHandlingTest {
//#strategy2
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Supervisor2() {
receive(ReceiveBuilder.
match(Props.class, props -> {
sender().tell(context().actorOf(props), self());
}).build();
}).build()
);
}
@Override
@ -115,12 +115,12 @@ public class FaultHandlingTest {
public class Child extends AbstractActor {
int state = 0;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public Child() {
receive(ReceiveBuilder.
match(Exception.class, exception -> { throw exception; }).
match(Integer.class, i -> state = i).
matchEquals("get", s -> sender().tell(state, self())).build();
matchEquals("get", s -> sender().tell(state, self())).build()
);
}
}

View file

@ -32,21 +32,21 @@ public class InitializationDocTest {
}
public static class MessageInitExample extends AbstractActor {
//#messageInit
private String initializeMe = null;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public MessageInitExample() {
//#messageInit
receive(ReceiveBuilder.
matchEquals("init", m1 -> {
initializeMe = "Up and running";
context().become(ReceiveBuilder.
matchEquals("U OK?", m2 -> {
sender().tell(initializeMe, self());
}).build());
}).build();
}).build()
//#messageInit
);
}
//#messageInit
}
@Test

View file

@ -9,8 +9,6 @@ import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
//#imports
@ -18,9 +16,8 @@ import scala.runtime.BoxedUnit;
public class MyActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public MyActor() {
receive(ReceiveBuilder.
match(String.class, s -> {
log.info("Received String message: {}", s);
//#my-actor
@ -29,7 +26,8 @@ public class MyActor extends AbstractActor {
//#reply
//#my-actor
}).
matchAny(o -> log.info("received unknown message")).build();
matchAny(o -> log.info("received unknown message")).build()
);
}
}
//#my-actor

View file

@ -18,9 +18,8 @@ public class SampleActor extends AbstractActor {
context().unbecome();
}).build();
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public SampleActor() {
receive(ReceiveBuilder.
match(Double.class, d -> {
sender().tell(d.isNaN() ? 0 : d, self());
}).
@ -30,7 +29,8 @@ public class SampleActor extends AbstractActor {
match(String.class, s -> s.startsWith("guard"), s -> {
sender().tell("startsWith(guard): " + s.toUpperCase(), self());
context().become(guarded, false);
}).build();
}).build()
);
}
}
//#sample-actor

View file

@ -73,9 +73,8 @@ public class FaultHandlingDocSample {
context().setReceiveTimeout(Duration.create("15 seconds"));
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public Listener() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Progress.class, progress -> {
log().info("Current progress: {} %", progress.percent);
if (progress.percent >= 100.0) {
@ -87,7 +86,8 @@ public class FaultHandlingDocSample {
// No progress within 15 seconds, ServiceUnavailable
log().error("Shutting down due to unavailable service");
context().system().shutdown();
}).build(), context());
}).build(), context()
));
}
}
@ -137,9 +137,8 @@ public class FaultHandlingDocSample {
return strategy;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public Worker() {
receive(LoggingReceive.create(ReceiveBuilder.
matchEquals(Start, x -> progressListener == null, x -> {
progressListener = sender();
context().system().scheduler().schedule(
@ -160,7 +159,8 @@ public class FaultHandlingDocSample {
}
}, context().dispatcher()), context().dispatcher())
.to(progressListener);
}).build(), context());
}).build(), context())
);
}
}
@ -266,9 +266,8 @@ public class FaultHandlingDocSample {
storage.tell(new Get(key), self());
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public CounterService() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Entry.class, entry -> entry.key.equals(key) && counter == null, entry -> {
// Reply from Storage of the initial value, now we can create the Counter
final long value = entry.value;
@ -301,7 +300,8 @@ public class FaultHandlingDocSample {
matchEquals(Reconnect, o -> {
// Re-establish storage after the scheduled delay
initStorage();
}).build(), context());
}).build(), context())
);
}
void forwardOrPlaceInBacklog(Object msg) {
@ -348,11 +348,8 @@ public class FaultHandlingDocSample {
public Counter(String key, long initialValue) {
this.key = key;
this.count = initialValue;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
receive(LoggingReceive.create(ReceiveBuilder.
match(UseStorage.class, useStorage -> {
storage = useStorage.storage;
storeCount();
@ -363,7 +360,8 @@ public class FaultHandlingDocSample {
}).
matchEquals(GetCurrentCount, gcc -> {
sender().tell(new CurrentCount(key, count), self());
}).build(), context());
}).build(), context())
);
}
void storeCount() {
@ -435,9 +433,8 @@ public class FaultHandlingDocSample {
final DummyDB db = DummyDB.instance;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return LoggingReceive.create(ReceiveBuilder.
public Storage() {
receive(LoggingReceive.create(ReceiveBuilder.
match(Store.class, store -> {
db.save(store.entry.key, store.entry.value);
}).
@ -445,7 +442,8 @@ public class FaultHandlingDocSample {
Long value = db.load(get.key);
sender().tell(new Entry(get.key, value == null ?
Long.valueOf(0L) : value), self());
}).build(), context());
}).build(), context())
);
}
}

View file

@ -43,8 +43,8 @@ public class DiningHakkersOnBecome {
}).build();
//A Chopstick begins its existence as available
public PartialFunction<Object, BoxedUnit> receive() {
return available;
public Chopstick() {
receive(available);
}
}
@ -125,11 +125,11 @@ public class DiningHakkersOnBecome {
}).build();
//All hakkers start in a non-eating state
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.matchEquals(Think, m -> {
public Hakker() {
receive(ReceiveBuilder.matchEquals(Think, m -> {
System.out.println(String.format("%s starts to think", name));
startThinking(Duration.create(5, SECONDS));
}).build();
}).build());
}
private void startThinking(FiniteDuration duration) {

View file

@ -35,24 +35,25 @@ public class LambdaPersistenceDocTest {
static Object o1 = new Object() {
//#definition
class MyProcessor extends AbstractProcessor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
// message successfully written to journal
Object payload = p.payload();
Long sequenceNr = p.sequenceNr();
// ...
}).
match(PersistenceFailure.class, failure -> {
// message failed to be written to journal
Object payload = failure.payload();
Long sequenceNr = failure.sequenceNr();
Throwable cause = failure.cause();
// ...
}).
matchAny(otherwise -> {
// message not written to journal
}).build();
public MyProcessor() {
receive(ReceiveBuilder.
match(Persistent.class, p -> {
// message successfully written to journal
Object payload = p.payload();
Long sequenceNr = p.sequenceNr();
// ...
}).
match(PersistenceFailure.class, failure -> {
// message failed to be written to journal
Object payload = failure.payload();
Long sequenceNr = failure.sequenceNr();
Throwable cause = failure.cause();
// ...
}).
matchAny(otherwise -> {
// message not written to journal
}).build()
);
}
}
//#definition
@ -67,11 +68,10 @@ public class LambdaPersistenceDocTest {
processor.tell(Persistent.create("foo"), null);
processor.tell("bar", null);
//#usage
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, received -> {/* ... */}).build();
receive(ReceiveBuilder.
match(Persistent.class, received -> {/* ... */}).build()
);
}
private void recover() {
@ -124,9 +124,10 @@ public class LambdaPersistenceDocTest {
}
//#processor-id-override
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, received -> {/* ... */}).build();
public MyProcessor4() {
receive(ReceiveBuilder.
match(Persistent.class, received -> {/* ... */}).build()
);
}
}
};
@ -140,27 +141,27 @@ public class LambdaPersistenceDocTest {
public MyProcessor() {
this.destination = context().actorOf(Props.create(MyDestination.class));
this.channel = context().actorOf(Channel.props(), "myChannel");
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
Persistent out = p.withPayload("done " + p.payload());
channel.tell(Deliver.create(out, destination.path()), self());
}).build();
receive(ReceiveBuilder.
match(Persistent.class, p -> {
Persistent out = p.withPayload("done " + p.payload());
channel.tell(Deliver.create(out, destination.path()), self());
}).build()
);
}
}
class MyDestination extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, p -> {
Object payload = p.payload();
Long sequenceNr = p.sequenceNr();
int redeliveries = p.redeliveries();
// ...
p.confirm();
}).build();
public MyDestination() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, p -> {
Object payload = p.payload();
Long sequenceNr = p.sequenceNr();
int redeliveries = p.redeliveries();
// ...
p.confirm();
}).build()
);
}
}
//#channel-example
@ -177,39 +178,38 @@ public class LambdaPersistenceDocTest {
//#channel-custom-settings
context().actorOf(
Channel.props(ChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)));
Channel.props(ChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)));
//#channel-custom-settings
//#channel-custom-listener
class MyListener extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(RedeliverFailure.class, r -> {
Iterable<ConfirmablePersistent> messages = r.getMessages();
// ...
}).build();
public MyListener() {
receive(ReceiveBuilder.
match(RedeliverFailure.class, r -> {
Iterable<ConfirmablePersistent> messages = r.getMessages();
// ...
}).build()
);
}
}
final ActorRef myListener = context().actorOf(Props.create(MyListener.class));
context().actorOf(Channel.props(
ChannelSettings.create().withRedeliverFailureListener(null)));
ChannelSettings.create().withRedeliverFailureListener(null)));
//#channel-custom-listener
}
receive(ReceiveBuilder.
match(Persistent.class, p -> {
Persistent out = p.withPayload("done " + p.payload());
channel.tell(Deliver.create(out, destination.path()), self());
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
Persistent out = p.withPayload("done " + p.payload());
channel.tell(Deliver.create(out, destination.path()), self());
//#channel-example-reply
channel.tell(Deliver.create(out, sender().path()), self());
//#channel-example-reply
}).build();
//#channel-example-reply
channel.tell(Deliver.create(out, sender().path()), self());
//#channel-example-reply
}).build()
);
}
}
};
@ -219,18 +219,19 @@ public class LambdaPersistenceDocTest {
class MyProcessor extends AbstractProcessor {
private Object state;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(String.class, s -> s.equals("snap"),
s -> saveSnapshot(state)).
match(SaveSnapshotSuccess.class, ss -> {
SnapshotMetadata metadata = ss.metadata();
// ...
}).
match(SaveSnapshotFailure.class, sf -> {
SnapshotMetadata metadata = sf.metadata();
// ...
}).build();
public MyProcessor() {
receive(ReceiveBuilder.
match(String.class, s -> s.equals("snap"),
s -> saveSnapshot(state)).
match(SaveSnapshotSuccess.class, ss -> {
SnapshotMetadata metadata = ss.metadata();
// ...
}).
match(SaveSnapshotFailure.class, sf -> {
SnapshotMetadata metadata = sf.metadata();
// ...
}).build()
);
}
}
//#save-snapshot
@ -241,13 +242,14 @@ public class LambdaPersistenceDocTest {
class MyProcessor extends AbstractProcessor {
private Object state;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(SnapshotOffer.class, s -> {
state = s.snapshot();
// ...
}).
match(Persistent.class, p -> {/* ...*/}).build();
public MyProcessor() {
receive(ReceiveBuilder.
match(SnapshotOffer.class, s -> {
state = s.snapshot();
// ...
}).
match(Persistent.class, p -> {/* ...*/}).build()
);
}
}
//#snapshot-offer
@ -257,17 +259,16 @@ public class LambdaPersistenceDocTest {
public MyActor() {
processor = context().actorOf(Props.create(MyProcessor.class));
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.match(Object.class, o -> {/* ... */}).build();
receive(ReceiveBuilder.
match(Object.class, o -> {/* ... */}).build()
);
}
private void recover() {
//#snapshot-criteria
processor.tell(Recover.create(
SnapshotSelectionCriteria
.create(457L, System.currentTimeMillis())), null);
SnapshotSelectionCriteria
.create(457L, System.currentTimeMillis())), null);
//#snapshot-criteria
}
}
@ -276,12 +277,13 @@ public class LambdaPersistenceDocTest {
static Object o6 = new Object() {
//#batch-write
class MyProcessor extends AbstractProcessor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> p.payload().equals("a"),
p -> {/* ... */}).
match(Persistent.class, p -> p.payload().equals("b"),
p -> {/* ... */}).build();
public MyProcessor() {
receive(ReceiveBuilder.
match(Persistent.class, p -> p.payload().equals("a"),
p -> {/* ... */}).
match(Persistent.class, p -> p.payload().equals("b"),
p -> {/* ... */}).build()
);
}
}
@ -291,8 +293,8 @@ public class LambdaPersistenceDocTest {
public void batchWrite() {
processor.tell(PersistentBatch
.create(asList(Persistent.create("a"),
Persistent.create("b"))), null);
.create(asList(Persistent.create("a"),
Persistent.create("b"))), null);
}
// ...
@ -307,18 +309,18 @@ public class LambdaPersistenceDocTest {
public void foo() {
//#persistent-channel-example
final ActorRef channel = context().actorOf(
PersistentChannel.props(
PersistentChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)),
"myPersistentChannel");
PersistentChannel.props(
PersistentChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)),
"myPersistentChannel");
channel.tell(Deliver.create(Persistent.create("example"), destination.path()), self());
//#persistent-channel-example
//#persistent-channel-watermarks
PersistentChannelSettings.create()
.withPendingConfirmationsMax(10000)
.withPendingConfirmationsMin(2000);
.withPendingConfirmationsMax(10000)
.withPendingConfirmationsMin(2000);
//#persistent-channel-watermarks
//#persistent-channel-reply
PersistentChannelSettings.create().withReplyPersistent(true);
@ -343,19 +345,19 @@ public class LambdaPersistenceDocTest {
// ...
// reliably deliver events
channel.tell(Deliver.create(
Persistent.create(event, getCurrentPersistentMessage()),
destination.path()), self());
Persistent.create(event, getCurrentPersistentMessage()),
destination.path()), self());
}
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, this::handleEvent).build();
match(String.class, this::handleEvent).build();
}
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, s -> s.equals("cmd"),
s -> persist("evt", this::handleEvent)).build();
match(String.class, s -> s.equals("cmd"),
s -> persist("evt", this::handleEvent)).build();
}
}
//#reliable-event-delivery
@ -369,11 +371,12 @@ public class LambdaPersistenceDocTest {
return "some-processor-id";
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, peristent -> {
// ...
}).build();
public MyView() {
receive(ReceiveBuilder.
match(Persistent.class, persistent -> {
// ...
}).build()
);
}
}
//#view

View file

@ -37,16 +37,17 @@ public class LambdaPersistencePluginDocTest {
selection.tell(new Identify(1), self());
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ActorIdentity.class, ai -> {
if (ai.correlationId().equals(1)) {
ActorRef store = ai.getRef();
if (store != null) {
SharedLeveldbJournal.setStore(store, context().system());
}
public SharedStorageUsage() {
receive(ReceiveBuilder.
match(ActorIdentity.class, ai -> {
if (ai.correlationId().equals(1)) {
ActorRef store = ai.getRef();
if (store != null) {
SharedLeveldbJournal.setStore(store, context().system());
}
}).build();
}
}).build()
);
}
}
//#shared-store-usage

View file

@ -13,62 +13,64 @@ import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class ConversationRecoveryExample {
public static String PING = "PING";
public static String PONG = "PONG";
public static String PING = "PING";
public static String PONG = "PONG";
public static class Ping extends AbstractProcessor {
final ActorRef pongChannel = context().actorOf(Channel.props(), "pongChannel");
int counter = 0;
public static class Ping extends AbstractProcessor {
final ActorRef pongChannel = context().actorOf(Channel.props(), "pongChannel");
int counter = 0;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PING), cp -> {
counter += 1;
System.out.println(String.format("received ping %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pongChannel.tell(Deliver.create(cp.withPayload(PONG), sender().path()), self());
}).
match(String.class,
s -> s.equals("init"),
s -> pongChannel.tell(Deliver.create(Persistent.create(PONG), sender().path()), self())).build();
}
public Ping() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PING), cp -> {
counter += 1;
System.out.println(String.format("received ping %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pongChannel.tell(Deliver.create(cp.withPayload(PONG), sender().path()), self());
}).
match(String.class,
s -> s.equals("init"),
s -> pongChannel.tell(Deliver.create(Persistent.create(PONG), sender().path()), self())).build()
);
}
}
public static class Pong extends AbstractProcessor {
private final ActorRef pingChannel = context().actorOf(Channel.props(), "pingChannel");
private int counter = 0;
public static class Pong extends AbstractProcessor {
private final ActorRef pingChannel = context().actorOf(Channel.props(), "pingChannel");
private int counter = 0;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PONG), cp -> {
counter += 1;
System.out.println(String.format("received pong %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pingChannel.tell(Deliver.create(cp.withPayload(PING), sender().path()), self());
}).build();
}
public Pong() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PONG), cp -> {
counter += 1;
System.out.println(String.format("received pong %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pingChannel.tell(Deliver.create(cp.withPayload(PING), sender().path()), self());
}).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping");
final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong");
final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping");
final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong");
ping.tell("init", pong);
}
ping.tell("init", pong);
}
}

View file

@ -80,13 +80,15 @@ class ExampleProcessor extends AbstractEventsourcedProcessor {
return state.size();
}
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(Evt.class, state::update).
match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()).build();
}
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.match(Cmd.class, c -> {
final String data = c.getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());

View file

@ -14,45 +14,45 @@ import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class ProcessorChannelExample {
public static class ExampleProcessor extends AbstractProcessor {
private ActorRef destination;
private ActorRef channel;
public static class ExampleProcessor extends AbstractProcessor {
private ActorRef destination;
private ActorRef channel;
public ExampleProcessor(ActorRef destination) {
this.destination = destination;
this.channel = context().actorOf(Channel.props(), "channel");
}
public ExampleProcessor(ActorRef destination) {
this.destination = destination;
this.channel = context().actorOf(Channel.props(), "channel");
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
System.out.println("processed " + p.payload());
channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self());
}).
match(String.class, s -> System.out.println("reply = " + s)).build();
}
receive(ReceiveBuilder.
match(Persistent.class, p -> {
System.out.println("processed " + p.payload());
channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self());
}).
match(String.class, s -> System.out.println("reply = " + s)).build()
);
}
}
public static class ExampleDestination extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println("received " + cp.payload());
sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null);
cp.confirm();
}).build();
}
public static class ExampleDestination extends AbstractActor {
public ExampleDestination() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println("received " + cp.payload());
sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null);
cp.confirm();
}).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class));
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1");
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class));
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
Thread.sleep(1000);
system.shutdown();
}
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -17,56 +17,57 @@ import scala.runtime.BoxedUnit;
import java.util.ArrayList;
public class ProcessorFailureExample {
public static class ExampleProcessor extends AbstractProcessor {
private ArrayList<Object> received = new ArrayList<Object>();
public static class ExampleProcessor extends AbstractProcessor {
private ArrayList<Object> received = new ArrayList<Object>();
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> p.payload().equals("boom"), p -> {throw new RuntimeException("boom");}).
match(Persistent.class, p -> !p.payload().equals("boom"), p -> received.add(p.payload())).
match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}).
match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).build();
}
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage(((Persistent) message.get()).sequenceNr(), false);
}
super.preRestart(reason, message);
}
public ExampleProcessor() {
receive(ReceiveBuilder.
match(Persistent.class, p -> p.payload().equals("boom"), p -> {throw new RuntimeException("boom");}).
match(Persistent.class, p -> !p.payload().equals("boom"), p -> received.add(p.payload())).
match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}).
match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).build()
);
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2");
processor.tell(Persistent.create("a"), null);
processor.tell("print", null);
processor.tell("boom", null);
processor.tell("print", null);
processor.tell(Persistent.create("b"), null);
processor.tell("print", null);
processor.tell(Persistent.create("boom"), null);
processor.tell("print", null);
processor.tell(Persistent.create("c"), null);
processor.tell("print", null);
// Will print in a first run (i.e. with empty journal):
// received [a]
// received [a, b]
// received [a, b, c]
// Will print in a second run:
// received [a, b, c, a]
// received [a, b, c, a, b]
// received [a, b, c, a, b, c]
// etc ...
Thread.sleep(1000);
system.shutdown();
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage(((Persistent) message.get()).sequenceNr(), false);
}
super.preRestart(reason, message);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2");
processor.tell(Persistent.create("a"), null);
processor.tell("print", null);
processor.tell("boom", null);
processor.tell("print", null);
processor.tell(Persistent.create("b"), null);
processor.tell("print", null);
processor.tell(Persistent.create("boom"), null);
processor.tell("print", null);
processor.tell(Persistent.create("c"), null);
processor.tell("print", null);
// Will print in a first run (i.e. with empty journal):
// received [a]
// received [a, b]
// received [a, b, c]
// Will print in a second run:
// received [a, b, c, a]
// received [a, b, c, a, b]
// received [a, b, c, a, b, c]
// etc ...
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -18,62 +18,63 @@ import java.io.Serializable;
import java.util.ArrayList;
public class SnapshotExample {
public static class ExampleState implements Serializable {
private final ArrayList<String> received;
public static class ExampleState implements Serializable {
private final ArrayList<String> received;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> received) {
this.received = received;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(received));
}
public void update(String s) {
received.add(s);
}
@Override
public String toString() {
return received.toString();
}
public ExampleState() {
this(new ArrayList<String>());
}
public static class ExampleProcessor extends AbstractProcessor {
private ExampleState state = new ExampleState();
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> state.update(String.format("%s-%d", p.payload(), p.sequenceNr()))).
match(SnapshotOffer.class, s -> {
ExampleState exState = (ExampleState) s.snapshot();
System.out.println("offered state = " + exState);
state = exState;
}).
match(String.class, s -> s.equals("print"), s -> System.out.println("current state = " + state)).
match(String.class, s -> s.equals("snap"), s ->
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy())).build();
}
public ExampleState(ArrayList<String> received) {
this.received = received;
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
processor.tell("snap", null);
processor.tell(Persistent.create("c"), null);
processor.tell(Persistent.create("d"), null);
processor.tell("print", null);
Thread.sleep(1000);
system.shutdown();
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(received));
}
public void update(String s) {
received.add(s);
}
@Override
public String toString() {
return received.toString();
}
}
public static class ExampleProcessor extends AbstractProcessor {
private ExampleState state = new ExampleState();
public ExampleProcessor() {
receive(ReceiveBuilder.
match(Persistent.class, p -> state.update(String.format("%s-%d", p.payload(), p.sequenceNr()))).
match(SnapshotOffer.class, s -> {
ExampleState exState = (ExampleState) s.snapshot();
System.out.println("offered state = " + exState);
state = exState;
}).
match(String.class, s -> s.equals("print"), s -> System.out.println("current state = " + state)).
match(String.class, s -> s.equals("snap"), s ->
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy())).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
processor.tell("snap", null);
processor.tell(Persistent.create("c"), null);
processor.tell(Persistent.create("d"), null);
processor.tell("print", null);
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -17,84 +17,87 @@ import scala.runtime.BoxedUnit;
import java.util.concurrent.TimeUnit;
public class ViewExample {
public static class ExampleProcessor extends AbstractProcessor {
@Override
public String processorId() {
return "processor-5";
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class,
p -> System.out.println(String.format("processor received %s (sequence nr = %d)",
p.payload(),
p.sequenceNr()))).build();
}
public static class ExampleProcessor extends AbstractProcessor {
@Override
public String processorId() {
return "processor-5";
}
public static class ExampleView extends AbstractView {
private final ActorRef destination = context().actorOf(Props.create(ExampleDestination.class));
private final ActorRef channel = context().actorOf(Channel.props("channel"));
public ExampleProcessor() {
receive(ReceiveBuilder.
match(Persistent.class,
p -> System.out.println(String.format("processor received %s (sequence nr = %d)",
p.payload(),
p.sequenceNr()))).build()
);
}
}
private int numReplicated = 0;
public static class ExampleView extends AbstractView {
private final ActorRef destination = context().actorOf(Props.create(ExampleDestination.class));
private final ActorRef channel = context().actorOf(Channel.props("channel"));
@Override
public String viewId() {
return "view-5";
}
private int numReplicated = 0;
@Override
public String processorId() {
return "processor-5";
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
numReplicated += 1;
System.out.println(String.format("view received %s (sequence nr = %d, num replicated = %d)",
p.payload(),
p.sequenceNr(),
numReplicated));
channel.tell(Deliver.create(p.withPayload("replicated-" + p.payload()), destination.path()),
self());
}).
match(SnapshotOffer.class, so -> {
numReplicated = (Integer) so.snapshot();
System.out.println(String.format("view received snapshot offer %s (metadata = %s)",
numReplicated,
so.metadata()));
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build();
}
@Override
public String viewId() {
return "view-5";
}
public static class ExampleDestination extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println(String.format("destination received %s (sequence nr = %s)",
cp.payload(),
cp.sequenceNr()));
cp.confirm();
}).build();
}
@Override
public String processorId() {
return "processor-5";
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class));
final ActorRef view = system.actorOf(Props.create(ExampleView.class));
system.scheduler()
.schedule(Duration.Zero(),
Duration.create(2, TimeUnit.SECONDS),
processor,
Persistent.create("scheduled"),
system.dispatcher(),
null);
system.scheduler()
.schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null);
public ExampleView() {
receive(ReceiveBuilder.
match(Persistent.class, p -> {
numReplicated += 1;
System.out.println(String.format("view received %s (sequence nr = %d, num replicated = %d)",
p.payload(),
p.sequenceNr(),
numReplicated));
channel.tell(Deliver.create(p.withPayload("replicated-" + p.payload()), destination.path()),
self());
}).
match(SnapshotOffer.class, so -> {
numReplicated = (Integer) so.snapshot();
System.out.println(String.format("view received snapshot offer %s (metadata = %s)",
numReplicated,
so.metadata()));
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build()
);
}
}
public static class ExampleDestination extends AbstractActor {
public ExampleDestination() {
receive(ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println(String.format("destination received %s (sequence nr = %s)",
cp.payload(),
cp.sequenceNr()));
cp.confirm();
}).build()
);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class));
final ActorRef view = system.actorOf(Props.create(ExampleView.class));
system.scheduler()
.schedule(Duration.Zero(),
Duration.create(2, TimeUnit.SECONDS),
processor,
Persistent.create("scheduled"),
system.dispatcher(),
null);
system.scheduler()
.schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null);
}
}

View file

@ -61,15 +61,15 @@ class ArithmeticService extends AbstractLoggingActor {
}
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
ArithmeticService() {
receive(ReceiveBuilder.
match(Expression.class, expr -> {
// We delegate the dangerous task of calculation to a worker, passing the
// expression as a constructor argument to the actor.
ActorRef worker = context().actorOf(FlakyExpressionCalculator.props(expr, Left));
pendingWorkers.put(worker, sender());
}).
match(Result.class, r -> notifyConsumerSuccess(sender(), r.getValue())).build();
match(Result.class, r -> notifyConsumerSuccess(sender(), r.getValue())).build()
);
}
}
}

View file

@ -67,11 +67,6 @@ public class FlakyExpressionCalculator extends AbstractLoggingActor {
private final Expression expr;
private final Position myPosition;
public FlakyExpressionCalculator(Expression expr, Position myPosition) {
this.expr = expr;
this.myPosition = myPosition;
}
private Expression getExpr() {
return expr;
}
@ -107,9 +102,11 @@ public class FlakyExpressionCalculator extends AbstractLoggingActor {
}
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
public FlakyExpressionCalculator(Expression expr, Position myPosition) {
this.expr = expr;
this.myPosition = myPosition;
receive(ReceiveBuilder.
match(Result.class, r -> expected.contains(r.getPosition()), r -> {
expected.remove(r.getPosition());
results.put(r.getPosition(), r.getValue());
@ -126,7 +123,7 @@ public class FlakyExpressionCalculator extends AbstractLoggingActor {
throw new IllegalStateException("Expected results for positions " +
expected.stream().map(Object::toString).collect(Collectors.joining(", ")) +
" but got position " + r.getPosition());
}).build();
}).build());
}
private Integer evaluate(Expression expr, Integer left, Integer right) {