diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 8188bd6e23..f6e5b92201 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -69,11 +69,11 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { within(2 seconds) { import Logging._ verifyLevel(bus, InfoLevel) - bus.logLevel = WarningLevel + bus.setLogLevel(WarningLevel) verifyLevel(bus, WarningLevel) - bus.logLevel = DebugLevel + bus.setLogLevel(DebugLevel) verifyLevel(bus, DebugLevel) - bus.logLevel = ErrorLevel + bus.setLogLevel(ErrorLevel) verifyLevel(bus, ErrorLevel) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 44ff94329c..f1378db41a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -62,6 +62,8 @@ import java.util.concurrent.atomic.AtomicBoolean * val msg = ((Request3) o).getMsg(); * getSender().tell(other.ask(msg, 5000)); // reply with Future for holding the other’s reply (timeout 5 seconds) * + * } else { + * unhandled(o); * } * } * } diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index d2cb13b779..bd0fa8a7ce 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -124,8 +124,6 @@ trait SubchannelClassification { this: EventBus ⇒ @volatile private var cache = Map.empty[Classifier, Set[Subscriber]] - protected def subscribers = cache.values.flatten - /** * Returns the Classifier associated with the given Event */ diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 9f282402a6..bfd0f2a184 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -54,7 +54,7 @@ trait LoggingBus extends ActorEventBus { * will not participate in the automatic management of log level * subscriptions! */ - def logLevel_=(level: LogLevel): Unit = guard.withGuard { + def setLogLevel(level: LogLevel): Unit = guard.withGuard { for { l ← AllLogLevels // subscribe if previously ignored and now requested diff --git a/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala b/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala index 059df35cd2..9ed33873e5 100644 --- a/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala +++ b/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala @@ -3,7 +3,7 @@ package akka.event.japi import akka.event._ /** - * See documentation for akka.event.LookupClassification + * See documentation for [[akka.event.LookupClassification]] * E is the Event type * S is the Subscriber type * C is the Classifier type @@ -15,7 +15,19 @@ abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassificatio } /** - * See documentation for akka.event.ScanningClassification + * See documentation for [[akka.event.SubchannelClassification]] + * E is the Event type + * S is the Subscriber type + * C is the Classifier type + */ +abstract class SubchannelEventBus[E, S, C] extends EventBus with SubchannelClassification { + type Event = E + type Subscriber = S + type Classifier = C +} + +/** + * See documentation for [[akka.event.ScanningClassification]] * E is the Event type * S is the Subscriber type * C is the Classifier type @@ -27,12 +39,11 @@ abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassific } /** - * See documentation for akka.event.ActorClassification + * See documentation for [[akka.event.ActorClassification]] * An EventBus where the Subscribers are ActorRefs and the Classifier is ActorRef * Means that ActorRefs "listen" to other ActorRefs * E is the Event type */ - abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier { } diff --git a/akka-docs/general/addressing.rst b/akka-docs/general/addressing.rst index 135c3a5fb4..a9485fd1b2 100644 --- a/akka-docs/general/addressing.rst +++ b/akka-docs/general/addressing.rst @@ -3,6 +3,10 @@ Actor References, Paths and Addresses ===================================== +.. sidebar:: Contents + + .. contents:: :local: + This chapter describes how actors are identified and located within a possibly distributed actor system. It ties into the central idea that :ref:`actor-systems` form intrinsic supervision hierarchies as well as that @@ -225,6 +229,23 @@ extracting the sender references, and then watch all discovered concrete actors. This scheme of resolving a selection may be improved upon in a future release. +.. _actorOf-vs-actorFor: + +Summary: ``actorOf`` vs. ``actorFor`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. note:: + + What the above sections described in some detail can be summarized and + memorized easily as follows: + + - ``actorOf`` only ever creates a new actor, and it creates it as a direct + child of the context on which this method is invoked (which may be any + actor or actor system). + + - ``actorFor`` only ever looks up an existing actor, i.e. does not create + one. + The Interplay with Remote Deployment ------------------------------------ diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst index be521bc933..3cb046d364 100644 --- a/akka-docs/general/configuration.rst +++ b/akka-docs/general/configuration.rst @@ -180,3 +180,40 @@ Logging of Configuration If the system or config property ``akka.logConfigOnStart`` is set to ``on``, then the complete configuration at INFO level when the actor system is started. This is useful when you are uncertain of what configuration is used. + +If in doubt, you can also easily and nicely inspect configuration objects +before or after using them to construct an actor system: + +.. code-block:: scala + + Welcome to Scala version 2.9.1.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_27). + Type in expressions to have them evaluated. + Type :help for more information. + + scala> import com.typesafe.config._ + import com.typesafe.config._ + + scala> ConfigFactory.parseString("a.b=12") + res0: com.typesafe.config.Config = Config(SimpleConfigObject({"a" : {"b" : 12}})) + + scala> res0.root.render + res1: java.lang.String = + { + # String: 1 + "a" : { + # String: 1 + "b" : 12 + } + } + +The comments preceding every item give detailed information about the origin of +the setting (file & line number) plus possible comments which were present, +e.g. in the reference configuration. The settings as merged with the reference +and parsed by the actor system can be displayed like this: + +.. code-block:: java + + final ActorSystem system = ActorSystem.create(); + println(system.settings()); + // this is a shortcut for system.settings().config().root().render() + diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java index 488d72a3b4..132dc990ee 100644 --- a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -39,6 +39,8 @@ public class FaultHandlingTestBase { public void onReceive(Object o) { if (o instanceof Props) { getSender().tell(getContext().actorOf((Props) o)); + } else { + unhandled(o); } } } @@ -49,6 +51,8 @@ public class FaultHandlingTestBase { public void onReceive(Object o) { if (o instanceof Props) { getSender().tell(getContext().actorOf((Props) o)); + } else { + unhandled(o); } } @@ -70,6 +74,8 @@ public class FaultHandlingTestBase { state = (Integer) o; } else if (o.equals("get")) { getSender().tell(state); + } else { + unhandled(o); } } } diff --git a/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java b/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java index 1998c45a76..f9d1ad8799 100644 --- a/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java @@ -69,6 +69,8 @@ public class SchedulerDocTestBase { public void onReceive(Object message) { if (message.equals("Tick")) { // Do someting + } else { + unhandled(message); } } }; diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index 27a7232df6..d442ae6461 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -24,6 +24,10 @@ import static akka.actor.Actors.*; import akka.japi.Procedure; //#import-procedure +//#import-watch +import akka.actor.Terminated; +//#import-watch + import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; @@ -161,6 +165,15 @@ public class UntypedActorDocTestBase { system.shutdown(); } + @Test + public void useWatch() { + ActorSystem system = ActorSystem.create("MySystem"); + ActorRef myActor = system.actorOf(new Props(WatchActor.class)); + Future future = myActor.ask("kill", 1000); + assert Await.result(future, Duration.parse("1 second")).equals("finished"); + system.shutdown(); + } + public static class MyActor extends UntypedActor { public MyActor(String s) { @@ -246,9 +259,36 @@ public class UntypedActorDocTestBase { getContext().become(angry); } else if (message.equals("foo")) { getContext().become(happy); + } else { + unhandled(message); } } } //#hot-swap-actor + //#watch + public static class WatchActor extends UntypedActor { + final ActorRef child = this.getContext().actorOf(Props.empty(), "child"); + { + this.getContext().watch(child); // <-- this is the only call needed for registration + } + ActorRef lastSender = getContext().system().deadLetters(); + + @Override + public void onReceive(Object message) { + if (message.equals("kill")) { + getContext().stop(child); + lastSender = getSender(); + } else if (message instanceof Terminated) { + final Terminated t = (Terminated) message; + if (t.getActor() == child) { + lastSender.tell("finished"); + } + } else { + unhandled(message); + } + } + } + //#watch + } diff --git a/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java b/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java index 160f5803ec..de84ce2c50 100644 --- a/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java +++ b/akka-docs/java/code/akka/docs/event/LoggingDocTestBase.java @@ -23,11 +23,14 @@ import org.junit.Test; import scala.Option; import static org.junit.Assert.*; +import akka.actor.UntypedActorFactory; +//#imports-deadletter import akka.actor.Props; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; +import akka.actor.DeadLetter; +//#imports-deadletter public class LoggingDocTestBase { @@ -43,6 +46,16 @@ public class LoggingDocTestBase { system.shutdown(); } + @Test + public void subscribeToDeadLetters() { + //#deadletters + final ActorSystem system = ActorSystem.create("DeadLetters"); + final ActorRef actor = system.actorOf(new Props(DeadLetterActor.class)); + system.eventStream().subscribe(actor, DeadLetter.class); + //#deadletters + system.shutdown(); + } + //#my-actor class MyActor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); @@ -87,4 +100,14 @@ public class LoggingDocTestBase { } //#my-event-listener + //#deadletter-actor + public static class DeadLetterActor extends UntypedActor { + public void onReceive(Object message) { + if (message instanceof DeadLetter) { + System.out.println(message); + } + } + } + //#deadletter-actor + } diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index 19c6c36641..c3278f23bd 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -290,6 +290,8 @@ public class FutureDocTestBase { } else { getSender().tell(i); } + } else { + unhandled(message); } } } diff --git a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java index 84bd33cb3b..dca10b8984 100644 --- a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java @@ -34,6 +34,8 @@ public class CoordinatedCounter extends UntypedActor { } } else if ("GetCount".equals(incoming)) { getSender().tell(count.single().get()); + } else { + unhandled(incoming); } } } diff --git a/akka-docs/java/code/akka/docs/transactor/Coordinator.java b/akka-docs/java/code/akka/docs/transactor/Coordinator.java index 37d7c935cb..6854ed99f6 100644 --- a/akka-docs/java/code/akka/docs/transactor/Coordinator.java +++ b/akka-docs/java/code/akka/docs/transactor/Coordinator.java @@ -22,6 +22,8 @@ public class Coordinator extends UntypedActor { }); //#coordinated-atomic } + } else { + unhandled(incoming); } } } diff --git a/akka-docs/java/event-bus.rst b/akka-docs/java/event-bus.rst new file mode 100644 index 0000000000..f97156e9e3 --- /dev/null +++ b/akka-docs/java/event-bus.rst @@ -0,0 +1,211 @@ +.. _event-bus-java: + +################ +Event Bus (Java) +################ + +.. sidebar:: Contents + + .. contents:: :local: + +Originally conceived as a way to send messages to groups of actors, the +:class:`EventBus` has been generalized into a set of composable traits +implementing a simple interface: + +- :meth:`public boolean subscribe(S subscriber, C classifier)` subscribes the + given subscriber to events with the given classifier + +- :meth:`public boolean unsubscribe(S subscriber, C classifier)` undoes a + specific subscription + +- :meth:`public void unsubscribe(S subscriber)` undoes all subscriptions for + the given subscriber + +- :meth:`public void publish(E event)` publishes an event, which first is classified + according to the specific bus (see `Classifiers`_) and then published to all + subscribers for the obtained classifier + +This mechanism is used in different places within Akka, e.g. the +:ref:`DeathWatch ` and the `Event Stream`_. Implementations +can make use of the specific building blocks presented below. + +An event bus must define the following three abstract types: + +- :class:`E` is the type of all events published on that bus + +- :class:`S` is the type of subscribers allowed to register on that event bus + +- :class:`C` defines the classifier to be used in selecting subscribers for + dispatching events + +The traits below are still generic in these types, but they need to be defined +for any concrete implementation. + +Classifiers +=========== + +The classifiers presented here are part of the Akka distribution, but rolling +your own in case you do not find a perfect match is not difficult, check the +implementation of the existing ones on `github`_. + +.. _github: https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/event/EventBus.scala + +Lookup Classification +--------------------- + +The simplest classification is just to extract an arbitrary classifier from +each event and maintaining a set of subscribers for each possible classifier. +This can be compared to tuning in on a radio station. The abstract class +:class:`LookupEventBus` is still generic in that it abstracts over how to +compare subscribers and how exactly to classify. The necessary methods to be +implemented are the following: + +- :meth:`public C classify(E event)` is used for extracting the + classifier from the incoming events. + +- :meth:`public int compareSubscribers(S a, S b)` must define a + partial order over the subscribers, expressed as expected from + :meth:`java.lang.Comparable.compare`. + +- :meth:`public void publish(E event, S subscriber)` will be invoked for + each event for all subscribers which registered themselves for the event’s + classifier. + +- :meth:`public int mapSize()` determines the initial size of the index data structure + used internally (i.e. the expected number of different classifiers). + +This classifier is efficient in case no subscribers exist for a particular event. + +Subchannel Classification +------------------------- + +If classifiers form a hierarchy and it is desired that subscription be possible +not only at the leaf nodes, this classification may be just the right one. It +can be compared to tuning in on (possibly multiple) radio channels by genre. +This classification has been developed for the case where the classifier is +just the JVM class of the event and subscribers may be interested in +subscribing to all subclasses of a certain class, but it may be used with any +classifier hierarchy. The abstract members needed by this classifier are + +- :meth:`public Subclassification[C] subclassification()` provides an object + providing :meth:`isEqual(a: Classifier, b: Classifier)` and + :meth:`isSubclass(a: Classifier, b: Classifier)` to be consumed by the other + methods of this classifier; this method is called on various occasions, it + should be implemented so that it always returns the same object for + performance reasons. + +- :meth:`public C classify(E event)` is used for extracting the classifier from + the incoming events. + +- :meth:`public void publish(E event, S subscriber)` will be invoked for + each event for all subscribers which registered themselves for the event’s + classifier. + +This classifier is also efficient in case no subscribers are found for an +event, but it uses conventional locking to synchronize an internal classifier +cache, hence it is not well-suited to use cases in which subscriptions change +with very high frequency (keep in mind that “opening” a classifier by sending +the first message will also have to re-check all previous subscriptions). + +Scanning Classification +----------------------- + +The previous classifier was built for multi-classifier subscriptions which are +strictly hierarchical, this classifier is useful if there are overlapping +classifiers which cover various parts of the event space without forming a +hierarchy. It can be compared to tuning in on (possibly multiple) radio +stations by geographical reachability (for old-school radio-wave transmission). +The abstract members for this classifier are: + +- :meth:`public int compareClassifiers(C a, C b)` is needed for + determining matching classifiers and storing them in an ordered collection. + +- :meth:`public int compareSubscribers(S a, S b)` is needed for + storing subscribers in an ordered collection. + +- :meth:`public boolean matches(C classifier, E event)` determines + whether a given classifier shall match a given event; it is invoked for each + subscription for all received events, hence the name of the classifier. + +- :meth:`public void publish(E event, S subscriber)` will be invoked for + each event for all subscribers which registered themselves for a classifier + matching this event. + +This classifier takes always a time which is proportional to the number of +subscriptions, independent of how many actually match. + +Actor Classification +-------------------- + +This classification has been developed specifically for implementing +:ref:`DeathWatch `: subscribers as well as classifiers are of +type :class:`ActorRef`. The abstract members are + +- :meth:`public ActorRef classify(E event)` is used for extracting the + classifier from the incoming events. + +- :meth:`public int mapSize()` determines the initial size of the index data structure + used internally (i.e. the expected number of different classifiers). + +This classifier is still is generic in the event type, and it is efficient for +all use cases. + +.. _event-stream-java: + +Event Stream +============ + +The event stream is the main event bus of each actor system: it is used for +carrying :ref:`log messages ` and `Dead Letters`_ and may be +used by the user code for other purposes as well. It uses `Subchannel +Classification`_ which enables registering to related sets of channels (as is +used for :class:`RemoteLifeCycleMessage`). The following example demonstrates +how a simple subscription works. Given a simple actor: + +.. includecode:: code/akka/docs/event/LoggingDocTestBase.java#imports-deadletter +.. includecode:: code/akka/docs/event/LoggingDocTestBase.java#deadletter-actor + +it can be subscribed like this: + +.. includecode:: code/akka/docs/event/LoggingDocTestBase.java#deadletters + +Default Handlers +---------------- + +Upon start-up the actor system creates and subscribes actors to the event +stream for logging: these are the handlers which are configured for example in +``application.conf``: + +.. code-block:: text + + akka { + event-handlers = ["akka.event.Logging$DefaultLogger"] + } + +The handlers listed here by fully-qualified class name will be subscribed to +all log event classes with priority higher than or equal to the configured +log-level and their subscriptions are kept in sync when changing the log-level +at runtime:: + + system.eventStream.setLogLevel(Logging.DebugLevel()); + +This means that log events for a level which will not be logged are not +typically not dispatched at all (unless manual subscriptions to the respective +event class have been done) + +Dead Letters +------------ + +As described at :ref:`stopping-actors-java`, messages queued when an actor +terminates or sent after its death are re-routed to the dead letter mailbox, +which by default will publish the messages wrapped in :class:`DeadLetter`. This +wrapper holds the original sender, receiver and message of the envelope which +was redirected. + +Other Uses +---------- + +The event stream is always there and ready to be used, just publish your own +events (it accepts ``Object``) and subscribe listeners to the corresponding JVM +classes. + diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index faed5362e6..4b0226fc35 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -9,6 +9,7 @@ Java API untyped-actors typed-actors logging + event-bus scheduler futures dataflow diff --git a/akka-docs/java/remoting.rst b/akka-docs/java/remoting.rst index 811aae8587..6e1e41e663 100644 --- a/akka-docs/java/remoting.rst +++ b/akka-docs/java/remoting.rst @@ -54,6 +54,8 @@ As you can see from the example above the following pattern is used to find an ` akka://@:/ +For more details on how actor addresses and paths are formed and used, please refer to :ref:`addressing`. + Creating Actors Remotely ^^^^^^^^^^^^^^^^^^^^^^^^ @@ -113,3 +115,108 @@ This is also done via configuration:: This configuration setting will clone the actor “aggregation” 10 times and deploy it evenly distributed across the two given target nodes. + +Description of the Remoting Sample +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The sample application included with the Akka sources demonstrates both, remote +deployment and look-up of remote actors. First, let us have a look at the +common setup for both scenarios (this is ``common.conf``): + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/common.conf + +This enables the remoting by installing the :class:`RemoteActorRefProvider` and +chooses the default remote transport. All other options will be set +specifically for each show case. + +.. note:: + + Be sure to replace the default IP 127.0.0.1 with the real address the system + is reachable by if you deploy onto multiple machines! + +.. _remote-lookup-sample-java: + +Remote Lookup +------------- + +In order to look up a remote actor, that one must be created first. For this +purpose, we configure an actor system to listen on port 2552 (this is a snippet +from ``application.conf``): + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/application.conf + :include: calculator + +Then the actor must be created. For all code which follows, assume these imports: + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java + :include: imports + +The actor doing the work will be this one: + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java + :include: actor + +and we start it within an actor system using the above configuration + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java + :include: setup + +With the service actor up and running, we may look it up from another actor +system, which will be configured to use port 2553 (this is a snippet from +``application.conf``). + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/application.conf + :include: remotelookup + +The actor which will query the calculator is a quite simple one for demonstration purposes + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java + :include: actor + +and it is created from an actor system using the aforementioned client’s config. + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java + :include: setup + +Requests which come in via ``doSomething`` will be sent to the client actor +along with the reference which was looked up earlier. Observe how the actor +system name using in ``actorFor`` matches the remote system’s name, as do IP +and port number. Top-level actors are always created below the ``"/user"`` +guardian, which supervises them. + +Remote Deployment +----------------- + +Creating remote actors instead of looking them up is not visible in the source +code, only in the configuration file. This section is used in this scenario +(this is a snippet from ``application.conf``): + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/resources/application.conf + :include: remotecreation + +For all code which follows, assume these imports: + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java + :include: imports + +The server actor can multiply or divide numbers: + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java + :include: actor + +The client actor looks like in the previous example + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java + :include: actor + +but the setup uses only ``actorOf``: + +.. includecode:: ../../akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java + :include: setup + +Observe how the name of the server actor matches the deployment given in the +configuration file, which will transparently delegate the actor creation to the +remote node. + + + diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 2b9a797b57..b24b1d6e6c 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -28,6 +28,10 @@ its syntax from Erlang. Creating Actors =============== +Since Akka enforces parental supervision every actor is supervised and +(potentially) the supervisor of its children; it is advisable that you +familiarize yourself with :ref:`actor-systems` and :ref:`supervision` and it +may also help to read :ref:`actorOf-vs-actorFor`. Defining an Actor class ----------------------- @@ -131,6 +135,7 @@ In addition, it offers: * system that the actor belongs to * parent supervisor * supervised children + * lifecycle monitoring * hotswap behavior stack as described in :ref:`UntypedActor.HotSwap` The remaining visible methods are user-overridable life-cycle hooks which are @@ -141,6 +146,36 @@ described in the following: The implementations shown above are the defaults provided by the :class:`UntypedActor` class. +.. _deathwatch-java: + +Lifecycle Monitoring aka DeathWatch +----------------------------------- + +In order to be notified when another actor terminates (i.e. stops permanently, +not temporary failure and restart), an actor may register itself for reception +of the :class:`Terminated` message dispatched by the other actor upon +termination (see `Stopping Actors`_). This service is provided by the +:class:`DeathWatch` component of the actor system. + +Registering a monitor is easy (see fourth line, the rest is for demonstrating +the whole functionality): + +.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java#watch + +It should be noted that the :class:`Terminated` message is generated +independent of the order in which registration and termination occur. +Registering multiple times does not necessarily lead to multiple messages being +generated, but there is no guarantee that only exactly one such message is +received: if termination of the watched actor has generated and queued the +message, and another registration is done before this message has been +processed, then a second message will be queued, because registering for +monitoring of an already terminated actor leads to the immediate generation of +the :class:`Terminated` message. + +It is also possible to deregister from watching another actor’s liveliness +using ``context.unwatch(target)``, but obviously this cannot guarantee +non-reception of the :class:`Terminated` message because that may already have +been queued. Start Hook ---------- @@ -225,8 +260,8 @@ Remote actor addresses may also be looked up, if remoting is enabled:: These look-ups return a (possibly remote) actor reference immediately, so you will have to send to it and await a reply in order to verify that ``serviceB`` -is actually reachable and running. - +is actually reachable and running. An example demonstrating actor look-up is +given in :ref:`remote-lookup-sample-java`. Messages and immutability ========================= @@ -383,6 +418,8 @@ message. .. includecode:: code/akka/docs/actor/MyReceivedTimeoutUntypedActor.java#receive-timeout +.. _stopping-actors-java: + Stopping actors =============== @@ -397,21 +434,44 @@ but additional messages in the mailbox will not be processed. By default these messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that depends on the mailbox implementation. -When stop is called then a call to the ``def postStop`` callback method will -take place. The ``Actor`` can use this callback to implement shutdown behavior. +Termination of an actor proceeds in two steps: first the actor suspends its +mailbox processing and sends a stop command to all its children, then it keeps +processing the termination messages from its children until the last one is +gone, finally terminating itself (invoking :meth:`postStop`, dumping mailbox, +publishing :class:`Terminated` on the :ref:`DeathWatch `, telling +its supervisor). This procedure ensures that actor system sub-trees terminate +in an orderly fashion, propagating the stop command to the leaves and +collecting their confirmation back to the stopped supervisor. If one of the +actors does not respond (i.e. processing a message for extended periods of time +and therefore not receiving the stop command), this whole process will be +stuck. + +It is possible to disregard specific children with respect to shutdown +confirmation by stopping them explicitly before issuing the +``context.stop(self)``:: + + context.stop(someChild); + context.stop(self); + +In this case ``someChild`` will be stopped asynchronously and re-parented to +the :class:`Locker`, where :class:`DavyJones` will keep tabs and dispose of it +eventually. + +Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be +stopped, and the aforementioned process will ensure proper termination of the +whole system. + +The :meth:`postStop()` hook is invoked after an actor is fully stopped. This +enables cleaning up of resources: .. code-block:: java + @Override public void postStop() { - ... // clean up resources + // close some file or database connection } -All Actors are stopped when the ``ActorSystem`` is stopped. -Supervised actors are stopped when the supervisor is stopped, i.e. children are stopped -when parent is stopped. - - PoisonPill ---------- @@ -420,9 +480,6 @@ stop the actor when the message is processed. ``PoisonPill`` is enqueued as ordinary messages and will be handled after messages that were already queued in the mailbox. -If the ``PoisonPill`` was sent with ``ask``, the ``Future`` will be completed with an -``akka.actor.ActorKilledException("PoisonPill")``. - Use it like this: .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index 3e815f11ea..af66da471c 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -12,3 +12,18 @@ changes in client code. This API cleanup is planned to be the last one for a significant amount of time. Detailed migration guide will be written. + +Unordered Collection of Migration Items +======================================= + +``ActorRef.ask()`` +------------------ + +The mechanism for collecting an actor’s reply in a :class:`Future` has been +reworked for better location transparency: it uses an actor under the hood. +This actor needs to be disposable by the garbage collector in case no reply is +ever received, and the decision is based upon a timeout. This timeout +determines when the actor will stop itself and hence closes the window for a +reply to be received; it is independent of the timeout applied when awaiting +completion of the :class:`Future`, however, the actor will complete the +:class:`Future` with an :class:`AskTimeoutException` when it stops itself. diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 12d368d148..204aa3ce56 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -28,6 +28,10 @@ its syntax from Erlang. Creating Actors =============== +Since Akka enforces parental supervision every actor is supervised and +(potentially) the supervisor of its children; it is advisable that you +familiarize yourself with :ref:`actor-systems` and :ref:`supervision` and it +may also help to read :ref:`actorOf-vs-actorFor`. Defining an Actor class ----------------------- @@ -156,6 +160,7 @@ In addition, it offers: * system that the actor belongs to * parent supervisor * supervised children + * lifecycle monitoring * hotswap behavior stack as described in :ref:`Actor.HotSwap` You can import the members in the :obj:`context` to avoid prefixing access with ``context.`` @@ -176,6 +181,35 @@ described in the following:: The implementations shown above are the defaults provided by the :class:`Actor` trait. +.. _deathwatch-scala: + +Lifecycle Monitoring aka DeathWatch +----------------------------------- + +In order to be notified when another actor terminates (i.e. stops permanently, +not temporary failure and restart), an actor may register itself for reception +of the :class:`Terminated` message dispatched by the other actor upon +termination (see `Stopping Actors`_). This service is provided by the +:class:`DeathWatch` component of the actor system. + +Registering a monitor is easy: + +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#watch + +It should be noted that the :class:`Terminated` message is generated +independent of the order in which registration and termination occur. +Registering multiple times does not necessarily lead to multiple messages being +generated, but there is no guarantee that only exactly one such message is +received: if termination of the watched actor has generated and queued the +message, and another registration is done before this message has been +processed, then a second message will be queued, because registering for +monitoring of an already terminated actor leads to the immediate generation of +the :class:`Terminated` message. + +It is also possible to deregister from watching another actor’s liveliness +using ``context.unwatch(target)``, but obviously this cannot guarantee +non-reception of the :class:`Terminated` message because that may already have +been queued. Start Hook ---------- @@ -258,7 +292,8 @@ Remote actor addresses may also be looked up, if remoting is enabled:: These look-ups return a (possibly remote) actor reference immediately, so you will have to send to it and await a reply in order to verify that ``serviceB`` -is actually reachable and running. +is actually reachable and running. An example demonstrating actor look-up is +given in :ref:`remote-lookup-sample-scala`. Messages and immutability ========================= @@ -442,6 +477,7 @@ object. .. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-timeout +.. _stopping-actors-scala: Stopping actors =============== @@ -457,19 +493,42 @@ but additional messages in the mailbox will not be processed. By default these messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that depends on the mailbox implementation. -When stop is called then a call to the ``def postStop`` callback method will -take place. The ``Actor`` can use this callback to implement shutdown behavior. +Termination of an actor proceeds in two steps: first the actor suspends its +mailbox processing and sends a stop command to all its children, then it keeps +processing the termination messages from its children until the last one is +gone, finally terminating itself (invoking :meth:`postStop`, dumping mailbox, +publishing :class:`Terminated` on the :ref:`DeathWatch `, telling +its supervisor). This procedure ensures that actor system sub-trees terminate +in an orderly fashion, propagating the stop command to the leaves and +collecting their confirmation back to the stopped supervisor. If one of the +actors does not respond (i.e. processing a message for extended periods of time +and therefore not receiving the stop command), this whole process will be +stuck. + +It is possible to disregard specific children with respect to shutdown +confirmation by stopping them explicitly before issuing the +``context.stop(self)``:: + + context.stop(someChild) + context.stop(self) + +In this case ``someChild`` will be stopped asynchronously and re-parented to +the :class:`Locker`, where :class:`DavyJones` will keep tabs and dispose of it +eventually. + +Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be +stopped, and the aforementioned process will ensure proper termination of the +whole system. + +The :meth:`postStop()` hook is invoked after an actor is fully stopped. This +enables cleaning up of resources: .. code-block:: scala override def postStop() = { - ... // clean up resources + // close some file or database connection } -All Actors are stopped when the ``ActorSystem`` is stopped. -Supervised actors are stopped when the supervisor is stopped, i.e. children are stopped -when parent is stopped. - PoisonPill ---------- @@ -479,10 +538,6 @@ stop the actor when the message is processed. ``PoisonPill`` is enqueued as ordinary messages and will be handled after messages that were already queued in the mailbox. -If the ``PoisonPill`` was sent with ``?``, the ``Future`` will be completed with an -``akka.actor.ActorKilledException("PoisonPill")``. - - .. _Actor.HotSwap: Become/Unbecome diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 06d32609d5..cdba3d07f3 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -296,4 +296,25 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val actor = system.actorOf(Props(new HotSwapActor), name = "hot") } + + "using watch" in { + //#watch + import akka.actor.{ Actor, Props, Terminated } + + class WatchActor extends Actor { + val child = context.actorOf(Props.empty, "child") + context.watch(child) // <-- this is the only call needed for registration + var lastSender = system.deadLetters + + def receive = { + case "kill" ⇒ context.stop(child); lastSender = sender + case Terminated(`child`) ⇒ lastSender ! "finished" + } + } + //#watch + val a = system.actorOf(Props(new WatchActor)) + implicit val sender = testActor + a ! "kill" + expectMsg("finished") + } } diff --git a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala index 04dfa2ec71..ffa56a3064 100644 --- a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala @@ -58,4 +58,17 @@ class LoggingDocSpec extends AkkaSpec { myActor ! "test" } + "allow registration to dead letters" in { + //#deadletters + import akka.actor.{ Actor, DeadLetter, Props } + + val listener = system.actorOf(Props(new Actor { + def receive = { + case d: DeadLetter ⇒ println(d) + } + })) + system.eventStream.subscribe(listener, classOf[DeadLetter]) + //#deadletters + } + } diff --git a/akka-docs/scala/event-bus.rst b/akka-docs/scala/event-bus.rst new file mode 100644 index 0000000000..633bd0ca64 --- /dev/null +++ b/akka-docs/scala/event-bus.rst @@ -0,0 +1,205 @@ +.. _event-bus-scala: + +################# +Event Bus (Scala) +################# + +.. sidebar:: Contents + + .. contents:: :local: + +Originally conceived as a way to send messages to groups of actors, the +:class:`EventBus` has been generalized into a set of composable traits +implementing a simple interface: + +- :meth:`subscribe(subscriber: Subscriber, classifier: Classifier): Boolean` + subscribes the given subscriber to events with the given classifier + +- :meth:`unsubscribe(subscriber: Subscriber, classifier: Classifier): Boolean` + undoes a specific subscription + +- :meth:`unsubscribe(subscriber: Subscriber)` undoes all subscriptions for the + given subscriber + +- :meth:`publish(event: Event)` publishes an event, which first is classified + according to the specific bus (see `Classifiers`_) and then published to all + subscribers for the obtained classifier + +This mechanism is used in different places within Akka, e.g. the +:ref:`DeathWatch ` and the `Event Stream`_. Implementations +can make use of the specific building blocks presented below. + +An event bus must define the following three abstract types: + +- :class:`Event` is the type of all events published on that bus + +- :class:`Subscriber` is the type of subscribers allowed to register on that + event bus + +- :class:`Classifier` defines the classifier to be used in selecting + subscribers for dispatching events + +The traits below are still generic in these types, but they need to be defined +for any concrete implementation. + +Classifiers +=========== + +The classifiers presented here are part of the Akka distribution, but rolling +your own in case you do not find a perfect match is not difficult, check the +implementation of the existing ones on `github`_. + +.. _github: https://github.com/jboner/akka/blob/master/akka-actor/src/main/scala/akka/event/EventBus.scala + +Lookup Classification +--------------------- + +The simplest classification is just to extract an arbitrary classifier from +each event and maintaining a set of subscribers for each possible classifier. +This can be compared to tuning in on a radio station. The trait +:class:`LookupClassification` is still generic in that it abstracts over how to +compare subscribers and how exactly to classify. The necessary methods to be +implemented are the following: + +- :meth:`classify(event: Event): Classifier` is used for extracting the + classifier from the incoming events. + +- :meth:`compareSubscribers(a: Subscriber, b: Subscriber): Int` must define a + partial order over the subscribers, expressed as expected from + :meth:`java.lang.Comparable.compare`. + +- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for + each event for all subscribers which registered themselves for the event’s + classifier. + +- :meth:`mapSize: Int` determines the initial size of the index data structure + used internally (i.e. the expected number of different classifiers). + +This classifier is efficient in case no subscribers exist for a particular event. + +Subchannel Classification +------------------------- + +If classifiers form a hierarchy and it is desired that subscription be possible +not only at the leaf nodes, this classification may be just the right one. It +can be compared to tuning in on (possibly multiple) radio channels by genre. +This classification has been developed for the case where the classifier is +just the JVM class of the event and subscribers may be interested in +subscribing to all subclasses of a certain class, but it may be used with any +classifier hierarchy. The abstract members needed by this classifier are + +- :obj:`subclassification: Subclassification[Classifier]` is an object + providing :meth:`isEqual(a: Classifier, b: Classifier)` and + :meth:`isSubclass(a: Classifier, b: Classifier)` to be consumed by the other + methods of this classifier. + +- :meth:`classify(event: Event): Classifier` is used for extracting the + classifier from the incoming events. + +- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for + each event for all subscribers which registered themselves for the event’s + classifier. + +This classifier is also efficient in case no subscribers are found for an +event, but it uses conventional locking to synchronize an internal classifier +cache, hence it is not well-suited to use cases in which subscriptions change +with very high frequency (keep in mind that “opening” a classifier by sending +the first message will also have to re-check all previous subscriptions). + +Scanning Classification +----------------------- + +The previous classifier was built for multi-classifier subscriptions which are +strictly hierarchical, this classifier is useful if there are overlapping +classifiers which cover various parts of the event space without forming a +hierarchy. It can be compared to tuning in on (possibly multiple) radio +stations by geographical reachability (for old-school radio-wave transmission). +The abstract members for this classifier are: + +- :meth:`compareClassifiers(a: Classifier, b: Classifier): Int` is needed for + determining matching classifiers and storing them in an ordered collection. + +- :meth:`compareSubscribers(a: Subscriber, b: Subscriber): Int` is needed for + storing subscribers in an ordered collection. + +- :meth:`matches(classifier: Classifier, event: Event): Boolean` determines + whether a given classifier shall match a given event; it is invoked for each + subscription for all received events, hence the name of the classifier. + +- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for + each event for all subscribers which registered themselves for a classifier + matching this event. + +This classifier takes always a time which is proportional to the number of +subscriptions, independent of how many actually match. + +Actor Classification +-------------------- + +This classification has been developed specifically for implementing +:ref:`DeathWatch `: subscribers as well as classifiers are of +type :class:`ActorRef`. The abstract members are + +- :meth:`classify(event: Event): ActorRef` is used for extracting the + classifier from the incoming events. + +- :meth:`mapSize: Int` determines the initial size of the index data structure + used internally (i.e. the expected number of different classifiers). + +This classifier is still is generic in the event type, and it is efficient for +all use cases. + +.. _event-stream-scala: + +Event Stream +============ + +The event stream is the main event bus of each actor system: it is used for +carrying :ref:`log messages ` and `Dead Letters`_ and may be +used by the user code for other purposes as well. It uses `Subchannel +Classification`_ which enables registering to related sets of channels (as is +used for :class:`RemoteLifeCycleMessage`). The following example demonstrates +how a simple subscription works: + +.. includecode:: code/akka/docs/event/LoggingDocSpec.scala#deadletters + +Default Handlers +---------------- + +Upon start-up the actor system creates and subscribes actors to the event +stream for logging: these are the handlers which are configured for example in +``application.conf``: + +.. code-block:: text + + akka { + event-handlers = ["akka.event.Logging$DefaultLogger"] + } + +The handlers listed here by fully-qualified class name will be subscribed to +all log event classes with priority higher than or equal to the configured +log-level and their subscriptions are kept in sync when changing the log-level +at runtime:: + + system.eventStream.setLogLevel(Logging.DebugLevel) + +This means that log events for a level which will not be logged are not +typically not dispatched at all (unless manual subscriptions to the respective +event class have been done) + +Dead Letters +------------ + +As described at :ref:`stopping-actors-scala`, messages queued when an actor +terminates or sent after its death are re-routed to the dead letter mailbox, +which by default will publish the messages wrapped in :class:`DeadLetter`. This +wrapper holds the original sender, receiver and message of the envelope which +was redirected. + +Other Uses +---------- + +The event stream is always there and ready to be used, just publish your own +events (it accepts ``AnyRef``) and subscribe listeners to the corresponding JVM +classes. + diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index c9cb6460f8..803dd26799 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -9,6 +9,7 @@ Scala API actors typed-actors logging + event-bus scheduler futures dataflow diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index 241ae31510..d0b613b190 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -70,6 +70,8 @@ Once you a reference to the actor you can interact with it they same way you wou actor ! "Pretty awesome feature" +For more details on how actor addresses and paths are formed and used, please refer to :ref:`addressing`. + Creating Actors Remotely ^^^^^^^^^^^^^^^^^^^^^^^^ @@ -151,6 +153,13 @@ This enables the remoting by installing the :class:`RemoteActorRefProvider` and chooses the default remote transport. All other options will be set specifically for each show case. +.. note:: + + Be sure to replace the default IP 127.0.0.1 with the real address the system + is reachable by if you deploy onto multiple machines! + +.. _remote-lookup-sample-scala: + Remote Lookup ------------- diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java index e60ac7abdb..2f4084e3f1 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java @@ -5,19 +5,24 @@ package sample.remote.calculator.java; import akka.actor.UntypedActor; +//#actor public class JAdvancedCalculatorActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { + if (message instanceof Op.Multiply) { Op.Multiply multiply = (Op.Multiply) message; System.out.println("Calculating " + multiply.getN1() + " * " + multiply.getN2()); getSender().tell(new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2())); + } else if (message instanceof Op.Divide) { Op.Divide divide = (Op.Divide) message; System.out.println("Calculating " + divide.getN1() + " / " + divide.getN2()); getSender().tell(new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2())); + } else { unhandled(message); } } } +//#actor diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java index 5144c90e0f..9887e01511 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java @@ -9,6 +9,7 @@ import akka.actor.Props; import akka.kernel.Bootable; import com.typesafe.config.ConfigFactory; +//#setup public class JCalculatorApplication implements Bootable { private ActorSystem system; @@ -26,3 +27,4 @@ public class JCalculatorApplication implements Bootable { system.shutdown(); } } +//#setup \ No newline at end of file diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java index dc303e55ca..64200c26de 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java @@ -8,24 +8,35 @@ import akka.actor.UntypedActor; import java.text.DecimalFormat; import java.text.NumberFormat; +//#actor public class JCreationActor extends UntypedActor { private static final NumberFormat formatter = new DecimalFormat("#0.00"); @Override public void onReceive(Object message) throws Exception { + if (message instanceof InternalMsg.MathOpMsg) { + // forward math op to server actor InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message; msg.getActor().tell(msg.getMathOp(), getSelf()); + } else if (message instanceof Op.MathResult) { + + // receive reply from server actor + if (message instanceof Op.MultiplicationResult) { Op.MultiplicationResult result = (Op.MultiplicationResult) message; System.out.println("Mul result: " + result.getN1() + " * " + result.getN2() + " = " + result.getResult()); + } else if (message instanceof Op.DivisionResult) { Op.DivisionResult result = (Op.DivisionResult) message; System.out.println("Div result: " + result.getN1() + " / " + result.getN2() + " = " + formatter.format(result.getResult())); } + } else { + unhandled(message); } } } +//#actor diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java index 82d5ed373f..ca2e961a5b 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java @@ -9,6 +9,7 @@ import akka.actor.Props; import akka.kernel.Bootable; import com.typesafe.config.ConfigFactory; +//#setup public class JCreationApplication implements Bootable { private ActorSystem system; private ActorRef actor; @@ -33,3 +34,4 @@ public class JCreationApplication implements Bootable { system.shutdown(); } } +//#setup diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java index 52a2f1d67c..a690bc0024 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java @@ -5,23 +5,35 @@ package sample.remote.calculator.java; import akka.actor.UntypedActor; +//#actor public class JLookupActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { + if (message instanceof InternalMsg.MathOpMsg) { + + // send message to server actor InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message; msg.getActor().tell(msg.getMathOp(), getSelf()); + } else if (message instanceof Op.MathResult) { + + // receive reply from server actor + if (message instanceof Op.AddResult) { Op.AddResult result = (Op.AddResult) message; System.out.println("Add result: " + result.getN1() + " + " + result.getN2() + " = " + result.getResult()); + } else if (message instanceof Op.SubtractResult) { Op.SubtractResult result = (Op.SubtractResult) message; System.out.println("Sub result: " + result.getN1() + " - " + result.getN2() + " = " + result.getResult()); } + } else { + unhandled(message); } } } +//#actor diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java index f70fda5baa..6baf07a49a 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java @@ -3,12 +3,16 @@ */ package sample.remote.calculator.java; +//#imports import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.UntypedActor; import akka.kernel.Bootable; import com.typesafe.config.ConfigFactory; +//#imports +//#setup public class JLookupApplication implements Bootable { private ActorSystem system; private ActorRef actor; @@ -33,3 +37,4 @@ public class JLookupApplication implements Bootable { system.shutdown(); } } +//#setup diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java index fb59702458..fdfe0ad646 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java @@ -5,19 +5,24 @@ package sample.remote.calculator.java; import akka.actor.UntypedActor; +//#actor public class JSimpleCalculatorActor extends UntypedActor { @Override public void onReceive(Object message) { + if (message instanceof Op.Add) { Op.Add add = (Op.Add) message; System.out.println("Calculating " + add.getN1() + " + " + add.getN2()); getSender().tell(new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2())); + } else if (message instanceof Op.Subtract) { Op.Subtract subtract = (Op.Subtract) message; System.out.println("Calculating " + subtract.getN1() + " - " + subtract.getN2()); getSender().tell(new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2())); + } else { unhandled(message); } } } +//#actor diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index dd3f978a5f..31abb52dac 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -314,10 +314,6 @@ object AkkaBuild extends Build { if (true || (System getProperty "java.runtime.version" startsWith "1.7")) Seq() else Seq("-optimize")), // -optimize fails with jdk7 javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"), - // add config dir to classpaths - unmanagedClasspath in Runtime <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") }, - unmanagedClasspath in Test <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") }, - parallelExecution in Test := System.getProperty("akka.parallelExecution", "true").toBoolean, // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)