Merge pull request #1993 from akka/wip-2922-event-bus-sample-patriknw
!act,doc #2922 Doc event bus and fix Java API
This commit is contained in:
commit
947b49c5b7
12 changed files with 801 additions and 166 deletions
317
akka-docs/rst/java/code/docs/event/EventBusDocTest.java
Normal file
317
akka-docs/rst/java/code/docs/event/EventBusDocTest.java
Normal file
|
|
@ -0,0 +1,317 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.event;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.event.japi.EventBus;
|
||||
|
||||
//#lookup-bus
|
||||
import akka.event.japi.LookupEventBus;
|
||||
|
||||
//#lookup-bus
|
||||
|
||||
//#subchannel-bus
|
||||
import akka.event.japi.SubchannelEventBus;
|
||||
import akka.util.Subclassification;
|
||||
|
||||
//#subchannel-bus
|
||||
|
||||
//#scanning-bus
|
||||
import akka.event.japi.ScanningEventBus;
|
||||
|
||||
//#scanning-bus
|
||||
|
||||
//#actor-bus
|
||||
import akka.event.japi.ActorEventBus;
|
||||
|
||||
//#actor-bus
|
||||
|
||||
public class EventBusDocTest {
|
||||
|
||||
public static class Event {}
|
||||
public static class Subscriber {}
|
||||
public static class Classifier {}
|
||||
|
||||
static public interface EventBusApi extends EventBus<Event, Subscriber, Classifier> {
|
||||
|
||||
@Override
|
||||
//#event-bus-api
|
||||
/**
|
||||
* Attempts to register the subscriber to the specified Classifier
|
||||
* @return true if successful and false if not (because it was already
|
||||
* subscribed to that Classifier, or otherwise)
|
||||
*/
|
||||
public boolean subscribe(Subscriber subscriber, Classifier to);
|
||||
|
||||
//#event-bus-api
|
||||
|
||||
@Override
|
||||
//#event-bus-api
|
||||
/**
|
||||
* Attempts to deregister the subscriber from the specified Classifier
|
||||
* @return true if successful and false if not (because it wasn't subscribed
|
||||
* to that Classifier, or otherwise)
|
||||
*/
|
||||
public boolean unsubscribe(Subscriber subscriber, Classifier from);
|
||||
|
||||
//#event-bus-api
|
||||
|
||||
@Override
|
||||
//#event-bus-api
|
||||
/**
|
||||
* Attempts to deregister the subscriber from all Classifiers it may be subscribed to
|
||||
*/
|
||||
public void unsubscribe(Subscriber subscriber);
|
||||
|
||||
//#event-bus-api
|
||||
|
||||
@Override
|
||||
//#event-bus-api
|
||||
/**
|
||||
* Publishes the specified Event to this bus
|
||||
*/
|
||||
public void publish(Event event);
|
||||
|
||||
//#event-bus-api
|
||||
|
||||
}
|
||||
|
||||
static
|
||||
//#lookup-bus
|
||||
public class MsgEnvelope {
|
||||
public final String topic;
|
||||
public final Object payload;
|
||||
|
||||
public MsgEnvelope(String topic, Object payload) {
|
||||
this.topic = topic;
|
||||
this.payload = payload;
|
||||
}
|
||||
}
|
||||
|
||||
//#lookup-bus
|
||||
static
|
||||
//#lookup-bus
|
||||
/**
|
||||
* Publishes the payload of the MsgEnvelope when the topic of the
|
||||
* MsgEnvelope equals the String specified when subscribing.
|
||||
*/
|
||||
public class LookupBusImpl extends LookupEventBus<MsgEnvelope, ActorRef, String> {
|
||||
|
||||
// is used for extracting the classifier from the incoming events
|
||||
@Override public String classify(MsgEnvelope event) {
|
||||
return event.topic;
|
||||
}
|
||||
|
||||
// will be invoked for each event for all subscribers which registered themselves
|
||||
// for the event’s classifier
|
||||
@Override public void publish(MsgEnvelope event, ActorRef subscriber) {
|
||||
subscriber.tell(event.payload, ActorRef.noSender());
|
||||
}
|
||||
|
||||
// must define a full order over the subscribers, expressed as expected from
|
||||
// `java.lang.Comparable.compare`
|
||||
@Override public int compareSubscribers(ActorRef a, ActorRef b) {
|
||||
return a.compareTo(b);
|
||||
}
|
||||
|
||||
// determines the initial size of the index data structure
|
||||
// used internally (i.e. the expected number of different classifiers)
|
||||
@Override public int mapSize() {
|
||||
return 128;
|
||||
}
|
||||
|
||||
}
|
||||
//#lookup-bus
|
||||
|
||||
static
|
||||
//#subchannel-bus
|
||||
public class StartsWithSubclassification implements Subclassification<String> {
|
||||
@Override public boolean isEqual(String x, String y) {
|
||||
return x.equals(y);
|
||||
}
|
||||
|
||||
@Override public boolean isSubclass(String x, String y) {
|
||||
return x.startsWith(y);
|
||||
}
|
||||
}
|
||||
|
||||
//#subchannel-bus
|
||||
|
||||
static
|
||||
//#subchannel-bus
|
||||
/**
|
||||
* Publishes the payload of the MsgEnvelope when the topic of the
|
||||
* MsgEnvelope starts with the String specified when subscribing.
|
||||
*/
|
||||
public class SubchannelBusImpl extends SubchannelEventBus<MsgEnvelope, ActorRef, String> {
|
||||
|
||||
// Subclassification is an object providing `isEqual` and `isSubclass`
|
||||
// to be consumed by the other methods of this classifier
|
||||
@Override public Subclassification<String> subclassification() {
|
||||
return new StartsWithSubclassification();
|
||||
}
|
||||
|
||||
// is used for extracting the classifier from the incoming events
|
||||
@Override public String classify(MsgEnvelope event) {
|
||||
return event.topic;
|
||||
}
|
||||
|
||||
// will be invoked for each event for all subscribers which registered themselves
|
||||
// for the event’s classifier
|
||||
@Override public void publish(MsgEnvelope event, ActorRef subscriber) {
|
||||
subscriber.tell(event.payload, ActorRef.noSender());
|
||||
}
|
||||
|
||||
}
|
||||
//#subchannel-bus
|
||||
|
||||
static
|
||||
//#scanning-bus
|
||||
/**
|
||||
* Publishes String messages with length less than or equal to the length
|
||||
* specified when subscribing.
|
||||
*/
|
||||
public class ScanningBusImpl extends ScanningEventBus<String, ActorRef, Integer> {
|
||||
|
||||
// is needed for determining matching classifiers and storing them in an
|
||||
// ordered collection
|
||||
@Override public int compareClassifiers(Integer a, Integer b) {
|
||||
return a.compareTo(b);
|
||||
}
|
||||
|
||||
// is needed for storing subscribers in an ordered collection
|
||||
@Override public int compareSubscribers(ActorRef a, ActorRef b) {
|
||||
return a.compareTo(b);
|
||||
}
|
||||
|
||||
// determines whether a given classifier shall match a given event; it is invoked
|
||||
// for each subscription for all received events, hence the name of the classifier
|
||||
@Override public boolean matches(Integer classifier, String event) {
|
||||
return event.length() <= classifier;
|
||||
}
|
||||
|
||||
// will be invoked for each event for all subscribers which registered themselves
|
||||
// for the event’s classifier
|
||||
@Override public void publish(String event, ActorRef subscriber) {
|
||||
subscriber.tell(event, ActorRef.noSender());
|
||||
}
|
||||
|
||||
}
|
||||
//#scanning-bus
|
||||
|
||||
static
|
||||
//#actor-bus
|
||||
public class Notification {
|
||||
public final ActorRef ref;
|
||||
public final int id;
|
||||
|
||||
public Notification(ActorRef ref, int id) {
|
||||
this.ref = ref;
|
||||
this.id = id;
|
||||
}
|
||||
}
|
||||
|
||||
//#actor-bus
|
||||
|
||||
static
|
||||
//#actor-bus
|
||||
public class ActorBusImpl extends ActorEventBus<Notification> {
|
||||
// is used for extracting the classifier from the incoming events
|
||||
@Override public ActorRef classify(Notification event) {
|
||||
return event.ref;
|
||||
}
|
||||
|
||||
// determines the initial size of the index data structure
|
||||
// used internally (i.e. the expected number of different classifiers)
|
||||
@Override public int mapSize() {
|
||||
return 128;
|
||||
}
|
||||
|
||||
}
|
||||
//#actor-bus
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||
new AkkaJUnitActorSystemResource("EventBusDocTest");
|
||||
|
||||
private final ActorSystem system = actorSystemResource.getSystem();
|
||||
|
||||
@Test
|
||||
public void demonstrateLookupClassification() {
|
||||
new JavaTestKit(system) {{
|
||||
//#lookup-bus-test
|
||||
LookupBusImpl lookupBus = new LookupBusImpl();
|
||||
lookupBus.subscribe(getTestActor(), "greetings");
|
||||
lookupBus.publish(new MsgEnvelope("time", System.currentTimeMillis()));
|
||||
lookupBus.publish(new MsgEnvelope("greetings", "hello"));
|
||||
expectMsgEquals("hello");
|
||||
//#lookup-bus-test
|
||||
}};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateSubchannelClassification() {
|
||||
new JavaTestKit(system) {{
|
||||
//#subchannel-bus-test
|
||||
SubchannelBusImpl subchannelBus = new SubchannelBusImpl();
|
||||
subchannelBus.subscribe(getTestActor(), "abc");
|
||||
subchannelBus.publish(new MsgEnvelope("xyzabc", "x"));
|
||||
subchannelBus.publish(new MsgEnvelope("bcdef", "b"));
|
||||
subchannelBus.publish(new MsgEnvelope("abc", "c"));
|
||||
expectMsgEquals("c");
|
||||
subchannelBus.publish(new MsgEnvelope("abcdef", "d"));
|
||||
expectMsgEquals("d");
|
||||
//#subchannel-bus-test
|
||||
}};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateScanningClassification() {
|
||||
new JavaTestKit(system) {{
|
||||
//#scanning-bus-test
|
||||
ScanningBusImpl scanningBus = new ScanningBusImpl();
|
||||
scanningBus.subscribe(getTestActor(), 3);
|
||||
scanningBus.publish("xyzabc");
|
||||
scanningBus.publish("ab");
|
||||
expectMsgEquals("ab");
|
||||
scanningBus.publish("abc");
|
||||
expectMsgEquals("abc");
|
||||
//#scanning-bus-test
|
||||
}};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateActorClassification() {
|
||||
//#actor-bus-test
|
||||
ActorRef observer1 = new JavaTestKit(system).getRef();
|
||||
ActorRef observer2 = new JavaTestKit(system).getRef();
|
||||
JavaTestKit probe1 = new JavaTestKit(system);
|
||||
JavaTestKit probe2 = new JavaTestKit(system);
|
||||
ActorRef subscriber1 = probe1.getRef();
|
||||
ActorRef subscriber2 = probe2.getRef();
|
||||
ActorBusImpl actorBus = new ActorBusImpl();
|
||||
actorBus.subscribe(subscriber1, observer1);
|
||||
actorBus.subscribe(subscriber2, observer1);
|
||||
actorBus.subscribe(subscriber2, observer2);
|
||||
Notification n1 = new Notification(observer1, 100);
|
||||
actorBus.publish(n1);
|
||||
probe1.expectMsgEquals(n1);
|
||||
probe2.expectMsgEquals(n1);
|
||||
Notification n2 = new Notification(observer2, 101);
|
||||
actorBus.publish(n2);
|
||||
probe2.expectMsgEquals(n2);
|
||||
probe1.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
|
||||
//#actor-bus-test
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,25 +1,15 @@
|
|||
.. _event-bus-java:
|
||||
|
||||
################
|
||||
Event Bus
|
||||
################
|
||||
###########
|
||||
Event Bus
|
||||
###########
|
||||
|
||||
|
||||
Originally conceived as a way to send messages to groups of actors, the
|
||||
:class:`EventBus` has been generalized into a set of composable traits
|
||||
:class:`EventBus` has been generalized into a set of abstract base classes
|
||||
implementing a simple interface:
|
||||
|
||||
- :meth:`public boolean subscribe(S subscriber, C classifier)` subscribes the
|
||||
given subscriber to events with the given classifier
|
||||
|
||||
- :meth:`public boolean unsubscribe(S subscriber, C classifier)` undoes a
|
||||
specific subscription
|
||||
|
||||
- :meth:`public void unsubscribe(S subscriber)` undoes all subscriptions for
|
||||
the given subscriber
|
||||
|
||||
- :meth:`public void publish(E event)` publishes an event, which first is classified
|
||||
according to the specific bus (see `Classifiers`_) and then published to all
|
||||
subscribers for the obtained classifier
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#event-bus-api
|
||||
|
||||
.. note::
|
||||
|
||||
|
|
@ -27,18 +17,18 @@ implementing a simple interface:
|
|||
published messages. If you need a reference to the original sender
|
||||
you have to provide it inside the message.
|
||||
|
||||
This mechanism is used in different places within Akka, e.g. the
|
||||
:ref:`DeathWatch <deathwatch-java>` and the `Event Stream`_. Implementations
|
||||
can make use of the specific building blocks presented below.
|
||||
This mechanism is used in different places within Akka, e.g. the `Event Stream`_.
|
||||
Implementations can make use of the specific building blocks presented below.
|
||||
|
||||
An event bus must define the following three abstract types:
|
||||
An event bus must define the following three type parameters:
|
||||
|
||||
- :class:`E` is the type of all events published on that bus
|
||||
- :class:`Event` (E) is the type of all events published on that bus
|
||||
|
||||
- :class:`S` is the type of subscribers allowed to register on that event bus
|
||||
- :class:`Subscriber` (S) is the type of subscribers allowed to register on that
|
||||
event bus
|
||||
|
||||
- :class:`C` defines the classifier to be used in selecting subscribers for
|
||||
dispatching events
|
||||
- :class:`Classifier` (C) defines the classifier to be used in selecting
|
||||
subscribers for dispatching events
|
||||
|
||||
The traits below are still generic in these types, but they need to be defined
|
||||
for any concrete implementation.
|
||||
|
|
@ -48,33 +38,24 @@ Classifiers
|
|||
|
||||
The classifiers presented here are part of the Akka distribution, but rolling
|
||||
your own in case you do not find a perfect match is not difficult, check the
|
||||
implementation of the existing ones on `github`_.
|
||||
|
||||
.. _github: https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/event/EventBus.scala
|
||||
implementation of the existing ones on `github <@github@/akka-actor/src/main/scala/akka/event/EventBus.scala>`_
|
||||
|
||||
Lookup Classification
|
||||
---------------------
|
||||
|
||||
The simplest classification is just to extract an arbitrary classifier from
|
||||
each event and maintaining a set of subscribers for each possible classifier.
|
||||
This can be compared to tuning in on a radio station. The abstract class
|
||||
:class:`LookupEventBus` is still generic in that it abstracts over how to
|
||||
compare subscribers and how exactly to classify. The necessary methods to be
|
||||
implemented are the following:
|
||||
This can be compared to tuning in on a radio station. The trait
|
||||
:class:`LookupClassification` is still generic in that it abstracts over how to
|
||||
compare subscribers and how exactly to classify.
|
||||
|
||||
- :meth:`public C classify(E event)` is used for extracting the
|
||||
classifier from the incoming events.
|
||||
The necessary methods to be implemented are illustrated with the following example:
|
||||
|
||||
- :meth:`public int compareSubscribers(S a, S b)` must define a
|
||||
partial order over the subscribers, expressed as expected from
|
||||
:meth:`java.lang.Comparable.compare`.
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#lookup-bus
|
||||
|
||||
- :meth:`public void publish(E event, S subscriber)` will be invoked for
|
||||
each event for all subscribers which registered themselves for the event’s
|
||||
classifier.
|
||||
A test for this implementation may look like this:
|
||||
|
||||
- :meth:`public int mapSize()` determines the initial size of the index data structure
|
||||
used internally (i.e. the expected number of different classifiers).
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#lookup-bus-test
|
||||
|
||||
This classifier is efficient in case no subscribers exist for a particular event.
|
||||
|
||||
|
|
@ -87,21 +68,15 @@ can be compared to tuning in on (possibly multiple) radio channels by genre.
|
|||
This classification has been developed for the case where the classifier is
|
||||
just the JVM class of the event and subscribers may be interested in
|
||||
subscribing to all subclasses of a certain class, but it may be used with any
|
||||
classifier hierarchy. The abstract members needed by this classifier are
|
||||
classifier hierarchy.
|
||||
|
||||
- :meth:`public Subclassification[C] subclassification()` provides an object
|
||||
providing :meth:`isEqual(a: Classifier, b: Classifier)` and
|
||||
:meth:`isSubclass(a: Classifier, b: Classifier)` to be consumed by the other
|
||||
methods of this classifier; this method is called on various occasions, it
|
||||
should be implemented so that it always returns the same object for
|
||||
performance reasons.
|
||||
The necessary methods to be implemented are illustrated with the following example:
|
||||
|
||||
- :meth:`public C classify(E event)` is used for extracting the classifier from
|
||||
the incoming events.
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#subchannel-bus
|
||||
|
||||
- :meth:`public void publish(E event, S subscriber)` will be invoked for
|
||||
each event for all subscribers which registered themselves for the event’s
|
||||
classifier.
|
||||
A test for this implementation may look like this:
|
||||
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#subchannel-bus-test
|
||||
|
||||
This classifier is also efficient in case no subscribers are found for an
|
||||
event, but it uses conventional locking to synchronize an internal classifier
|
||||
|
|
@ -117,21 +92,14 @@ strictly hierarchical, this classifier is useful if there are overlapping
|
|||
classifiers which cover various parts of the event space without forming a
|
||||
hierarchy. It can be compared to tuning in on (possibly multiple) radio
|
||||
stations by geographical reachability (for old-school radio-wave transmission).
|
||||
The abstract members for this classifier are:
|
||||
|
||||
- :meth:`public int compareClassifiers(C a, C b)` is needed for
|
||||
determining matching classifiers and storing them in an ordered collection.
|
||||
The necessary methods to be implemented are illustrated with the following example:
|
||||
|
||||
- :meth:`public int compareSubscribers(S a, S b)` is needed for
|
||||
storing subscribers in an ordered collection.
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#scanning-bus
|
||||
|
||||
- :meth:`public boolean matches(C classifier, E event)` determines
|
||||
whether a given classifier shall match a given event; it is invoked for each
|
||||
subscription for all received events, hence the name of the classifier.
|
||||
A test for this implementation may look like this:
|
||||
|
||||
- :meth:`public void publish(E event, S subscriber)` will be invoked for
|
||||
each event for all subscribers which registered themselves for a classifier
|
||||
matching this event.
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#scanning-bus-test
|
||||
|
||||
This classifier takes always a time which is proportional to the number of
|
||||
subscriptions, independent of how many actually match.
|
||||
|
|
@ -139,15 +107,17 @@ subscriptions, independent of how many actually match.
|
|||
Actor Classification
|
||||
--------------------
|
||||
|
||||
This classification has been developed specifically for implementing
|
||||
This classification was originally developed specifically for implementing
|
||||
:ref:`DeathWatch <deathwatch-java>`: subscribers as well as classifiers are of
|
||||
type :class:`ActorRef`. The abstract members are
|
||||
type :class:`ActorRef`.
|
||||
|
||||
- :meth:`public ActorRef classify(E event)` is used for extracting the
|
||||
classifier from the incoming events.
|
||||
The necessary methods to be implemented are illustrated with the following example:
|
||||
|
||||
- :meth:`public int mapSize()` determines the initial size of the index data structure
|
||||
used internally (i.e. the expected number of different classifiers).
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#actor-bus
|
||||
|
||||
A test for this implementation may look like this:
|
||||
|
||||
.. includecode:: code/docs/event/EventBusDocTest.java#actor-bus-test
|
||||
|
||||
This classifier is still is generic in the event type, and it is efficient for
|
||||
all use cases.
|
||||
|
|
@ -161,7 +131,7 @@ The event stream is the main event bus of each actor system: it is used for
|
|||
carrying :ref:`log messages <logging-java>` and `Dead Letters`_ and may be
|
||||
used by the user code for other purposes as well. It uses `Subchannel
|
||||
Classification`_ which enables registering to related sets of channels (as is
|
||||
used for :class:`RemoteLifeCycleMessage`). The following example demonstrates
|
||||
used for :class:`RemotingLifecycleEvent`). The following example demonstrates
|
||||
how a simple subscription works. Given a simple actor:
|
||||
|
||||
.. includecode:: code/docs/event/LoggingDocTest.java#imports-deadletter
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue