From a247365b576322d16c5947e909f9485d5ddeb2c3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 6 Feb 2014 15:08:51 +0100 Subject: [PATCH] !act,doc #2922 Doc event bus and fix Java API --- .../test/scala/akka/event/EventBusSpec.scala | 24 +- .../src/main/scala/akka/event/EventBus.scala | 11 +- .../akka/event/japi/EventBusJavaAPI.scala | 195 ++++++++++- akka-docs/rst/additional/index.rst | 1 - akka-docs/rst/additional/recipes.rst | 4 - akka-docs/rst/dev/developer-guidelines.rst | 2 - .../java/code/docs/event/EventBusDocTest.java | 317 ++++++++++++++++++ akka-docs/rst/java/event-bus.rst | 112 +++---- akka-docs/rst/project/licenses.rst | 3 +- .../code/docs/event/EventBusDocSpec.scala | 202 +++++++++++ akka-docs/rst/scala/event-bus.rst | 92 ++--- akka-docs/rst/scala/fsm.rst | 4 +- .../RemoteConsistentHashingRouterSpec.scala | 4 +- 13 files changed, 803 insertions(+), 168 deletions(-) delete mode 100644 akka-docs/rst/additional/recipes.rst create mode 100644 akka-docs/rst/java/code/docs/event/EventBusDocTest.java create mode 100644 akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index b3abdbef75..85dd584d59 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -171,9 +171,12 @@ class ActorEventBusSpec extends EventBusSpec("ActorEventBus") { } object ScanningEventBusSpec { - import akka.event.japi.ScanningEventBus - class MyScanningEventBus extends ScanningEventBus[Int, akka.japi.Procedure[Int], String] { + class MyScanningEventBus extends EventBus with ScanningClassification { + type Event = Int + type Subscriber = Procedure[Int] + type Classifier = String + protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b) @@ -200,11 +203,17 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { } object LookupEventBusSpec { - class MyLookupEventBus extends akka.event.japi.LookupEventBus[Int, akka.japi.Procedure[Int], String] { - protected def classify(event: Event): Classifier = event.toString - protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b) - protected def mapSize = 32 - protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) + class MyLookupEventBus extends EventBus with LookupClassification { + type Event = Int + type Subscriber = Procedure[Int] + type Classifier = String + + override protected def classify(event: Int): String = event.toString + override protected def compareSubscribers(a: Procedure[Int], b: Procedure[Int]): Int = + akka.util.Helpers.compareIdentityHash(a, b) + override protected def mapSize = 32 + override protected def publish(event: Int, subscriber: Procedure[Int]): Unit = + subscriber(event) } } @@ -223,3 +232,4 @@ class LookupEventBusSpec extends EventBusSpec("LookupEventBus") { def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = () } + diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index c468aaa20c..9bbe4d86d0 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -23,15 +23,18 @@ trait EventBus { type Classifier type Subscriber + //#event-bus-api /** * Attempts to register the subscriber to the specified Classifier - * @return true if successful and false if not (because it was already subscribed to that Classifier, or otherwise) + * @return true if successful and false if not (because it was already + * subscribed to that Classifier, or otherwise) */ def subscribe(subscriber: Subscriber, to: Classifier): Boolean /** * Attempts to deregister the subscriber from the specified Classifier - * @return true if successful and false if not (because it wasn't subscribed to that Classifier, or otherwise) + * @return true if successful and false if not (because it wasn't subscribed + * to that Classifier, or otherwise) */ def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean @@ -44,6 +47,7 @@ trait EventBus { * Publishes the specified Event to this bus */ def publish(event: Event): Unit + //#event-bus-api } /** @@ -118,6 +122,9 @@ trait LookupClassification { this: EventBus ⇒ */ trait SubchannelClassification { this: EventBus ⇒ + /** + * The logic to form sub-class hierarchy + */ protected implicit def subclassification: Subclassification[Classifier] // must be lazy to avoid initialization order problem with subclassification diff --git a/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala b/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala index 7a963046e8..448293914b 100644 --- a/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala +++ b/akka-actor/src/main/scala/akka/event/japi/EventBusJavaAPI.scala @@ -3,18 +3,89 @@ */ package akka.event.japi -import akka.event._ +import akka.util.Subclassification +import akka.actor.ActorRef /** - * See documentation for [[akka.event.LookupClassification]] + * Java API: See documentation for [[akka.event.EventBus]] * E is the Event type * S is the Subscriber type * C is the Classifier type */ -abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassification { - type Event = E - type Subscriber = S - type Classifier = C +trait EventBus[E, S, C] { + + /** + * Attempts to register the subscriber to the specified Classifier + * @return true if successful and false if not (because it was already subscribed to that Classifier, or otherwise) + */ + def subscribe(subscriber: S, to: C): Boolean + + /** + * Attempts to deregister the subscriber from the specified Classifier + * @return true if successful and false if not (because it wasn't subscribed to that Classifier, or otherwise) + */ + def unsubscribe(subscriber: S, from: C): Boolean + + /** + * Attempts to deregister the subscriber from all Classifiers it may be subscribed to + */ + def unsubscribe(subscriber: S): Unit + + /** + * Publishes the specified Event to this bus + */ + def publish(event: E): Unit +} + +/** + * Java API: See documentation for [[akka.event.LookupClassification]] + * E is the Event type + * S is the Subscriber type + * C is the Classifier type + */ +abstract class LookupEventBus[E, S, C] extends EventBus[E, S, C] { + private val bus = new akka.event.EventBus with akka.event.LookupClassification { + type Event = E + type Subscriber = S + type Classifier = C + + override protected def mapSize: Int = LookupEventBus.this.mapSize + + override protected def compareSubscribers(a: S, b: S): Int = + LookupEventBus.this.compareSubscribers(a, b) + + override protected def classify(event: E): C = + LookupEventBus.this.classify(event) + + override protected def publish(event: E, subscriber: S): Unit = + LookupEventBus.this.publish(event, subscriber) + } + + /** + * This is a size hint for the number of Classifiers you expect to have (use powers of 2) + */ + protected def mapSize(): Int + + /** + * Provides a total ordering of Subscribers (think java.util.Comparator.compare) + */ + protected def compareSubscribers(a: S, b: S): Int + + /** + * Returns the Classifier associated with the given Event + */ + protected def classify(event: E): C + + /** + * Publishes the given Event to the given Subscriber + */ + protected def publish(event: E, subscriber: S): Unit + + override def subscribe(subscriber: S, to: C): Boolean = bus.subscribe(subscriber, to) + override def unsubscribe(subscriber: S, from: C): Boolean = bus.unsubscribe(subscriber, from) + override def unsubscribe(subscriber: S): Unit = bus.unsubscribe(subscriber) + override def publish(event: E): Unit = bus.publish(event) + } /** @@ -23,10 +94,43 @@ abstract class LookupEventBus[E, S, C] extends EventBus with LookupClassificatio * S is the Subscriber type * C is the Classifier type */ -abstract class SubchannelEventBus[E, S, C] extends EventBus with SubchannelClassification { - type Event = E - type Subscriber = S - type Classifier = C + +abstract class SubchannelEventBus[E, S, C] extends EventBus[E, S, C] { + private val bus = new akka.event.EventBus with akka.event.SubchannelClassification { + type Event = E + type Subscriber = S + type Classifier = C + + override protected def subclassification: Subclassification[Classifier] = + SubchannelEventBus.this.subclassification + + override protected def classify(event: Event): Classifier = + SubchannelEventBus.this.classify(event) + + override protected def publish(event: Event, subscriber: Subscriber): Unit = + SubchannelEventBus.this.publish(event, subscriber) + } + + /** + * The logic to form sub-class hierarchy + */ + def subclassification: Subclassification[C] + + /** + * Returns the Classifier associated with the given Event + */ + protected def classify(event: E): C + + /** + * Publishes the given Event to the given Subscriber + */ + protected def publish(event: E, subscriber: S): Unit + + override def subscribe(subscriber: S, to: C): Boolean = bus.subscribe(subscriber, to) + override def unsubscribe(subscriber: S, from: C): Boolean = bus.unsubscribe(subscriber, from) + override def unsubscribe(subscriber: S): Unit = bus.unsubscribe(subscriber) + override def publish(event: E): Unit = bus.publish(event) + } /** @@ -35,10 +139,49 @@ abstract class SubchannelEventBus[E, S, C] extends EventBus with SubchannelClass * S is the Subscriber type * C is the Classifier type */ -abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassification { - type Event = E - type Subscriber = S - type Classifier = C +abstract class ScanningEventBus[E, S, C] extends EventBus[E, S, C] { + private val bus = new akka.event.EventBus with akka.event.ScanningClassification { + type Event = E + type Subscriber = S + type Classifier = C + + override protected def compareClassifiers(a: C, b: C): Int = + ScanningEventBus.this.compareClassifiers(a, b) + + override protected def compareSubscribers(a: S, b: S): Int = + ScanningEventBus.this.compareSubscribers(a, b) + + override protected def matches(classifier: C, event: E): Boolean = + ScanningEventBus.this.matches(classifier, event) + + override protected def publish(event: E, subscriber: S): Unit = + ScanningEventBus.this.publish(event, subscriber) + } + + /** + * Provides a total ordering of Classifiers (think java.util.Comparator.compare) + */ + protected def compareClassifiers(a: C, b: C): Int + + /** + * Provides a total ordering of Subscribers (think java.util.Comparator.compare) + */ + protected def compareSubscribers(a: S, b: S): Int + + /** + * Returns whether the specified Classifier matches the specified Event + */ + protected def matches(classifier: C, event: E): Boolean + + /** + * Publishes the specified Event to the specified Subscriber + */ + protected def publish(event: E, subscriber: S): Unit + + override def subscribe(subscriber: S, to: C): Boolean = bus.subscribe(subscriber, to) + override def unsubscribe(subscriber: S, from: C): Boolean = bus.unsubscribe(subscriber, from) + override def unsubscribe(subscriber: S): Unit = bus.unsubscribe(subscriber) + override def publish(event: E): Unit = bus.publish(event) } /** @@ -47,6 +190,28 @@ abstract class ScanningEventBus[E, S, C] extends EventBus with ScanningClassific * Means that ActorRefs "listen" to other ActorRefs * E is the Event type */ -abstract class ActorEventBus[E] extends akka.event.ActorEventBus with ActorClassification with ActorClassifier { +abstract class ActorEventBus[E] extends EventBus[E, ActorRef, ActorRef] { + private val bus = new akka.event.ActorEventBus with akka.event.ActorClassification with akka.event.ActorClassifier { + type Event = E + override protected def mapSize: Int = ActorEventBus.this.mapSize + + override protected def classify(event: E): ActorRef = + ActorEventBus.this.classify(event) + } + + /** + * This is a size hint for the number of Classifiers you expect to have (use powers of 2) + */ + protected def mapSize(): Int + + /** + * Returns the Classifier associated with the given Event + */ + protected def classify(event: E): ActorRef + + override def subscribe(subscriber: ActorRef, to: ActorRef): Boolean = bus.subscribe(subscriber, to) + override def unsubscribe(subscriber: ActorRef, from: ActorRef): Boolean = bus.unsubscribe(subscriber, from) + override def unsubscribe(subscriber: ActorRef): Unit = bus.unsubscribe(subscriber) + override def publish(event: E): Unit = bus.publish(event) } diff --git a/akka-docs/rst/additional/index.rst b/akka-docs/rst/additional/index.rst index 74880955fb..84c020449e 100644 --- a/akka-docs/rst/additional/index.rst +++ b/akka-docs/rst/additional/index.rst @@ -5,7 +5,6 @@ Additional Information :maxdepth: 2 books - recipes language-bindings osgi http diff --git a/akka-docs/rst/additional/recipes.rst b/akka-docs/rst/additional/recipes.rst deleted file mode 100644 index fb4682f2fa..0000000000 --- a/akka-docs/rst/additional/recipes.rst +++ /dev/null @@ -1,4 +0,0 @@ -Here is a list of recipes for all things Akka -============================================= - -* `Martin Krassers Akka Event Sourcing example `_ diff --git a/akka-docs/rst/dev/developer-guidelines.rst b/akka-docs/rst/dev/developer-guidelines.rst index 35064807c0..135f056834 100644 --- a/akka-docs/rst/dev/developer-guidelines.rst +++ b/akka-docs/rst/dev/developer-guidelines.rst @@ -61,8 +61,6 @@ All code that is checked in **should** have tests. All testing is done with ``Sc * Name tests as **Test.scala** if they do not depend on any external stuff. That keeps surefire happy. * Name tests as **Spec.scala** if they have external dependencies. -There is a testing standard that should be followed: `Ticket001Spec `_ - Actor TestKit ^^^^^^^^^^^^^ diff --git a/akka-docs/rst/java/code/docs/event/EventBusDocTest.java b/akka-docs/rst/java/code/docs/event/EventBusDocTest.java new file mode 100644 index 0000000000..9292673d0c --- /dev/null +++ b/akka-docs/rst/java/code/docs/event/EventBusDocTest.java @@ -0,0 +1,317 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package docs.event; + +import java.util.concurrent.TimeUnit; + +import org.junit.ClassRule; +import org.junit.Test; + +import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorSystem; +import akka.actor.ActorRef; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.JavaTestKit; +import akka.event.japi.EventBus; + +//#lookup-bus +import akka.event.japi.LookupEventBus; + +//#lookup-bus + +//#subchannel-bus +import akka.event.japi.SubchannelEventBus; +import akka.util.Subclassification; + +//#subchannel-bus + +//#scanning-bus +import akka.event.japi.ScanningEventBus; + +//#scanning-bus + +//#actor-bus +import akka.event.japi.ActorEventBus; + +//#actor-bus + +public class EventBusDocTest { + + public static class Event {} + public static class Subscriber {} + public static class Classifier {} + + static public interface EventBusApi extends EventBus { + + @Override + //#event-bus-api + /** + * Attempts to register the subscriber to the specified Classifier + * @return true if successful and false if not (because it was already + * subscribed to that Classifier, or otherwise) + */ + public boolean subscribe(Subscriber subscriber, Classifier to); + + //#event-bus-api + + @Override + //#event-bus-api + /** + * Attempts to deregister the subscriber from the specified Classifier + * @return true if successful and false if not (because it wasn't subscribed + * to that Classifier, or otherwise) + */ + public boolean unsubscribe(Subscriber subscriber, Classifier from); + + //#event-bus-api + + @Override + //#event-bus-api + /** + * Attempts to deregister the subscriber from all Classifiers it may be subscribed to + */ + public void unsubscribe(Subscriber subscriber); + + //#event-bus-api + + @Override + //#event-bus-api + /** + * Publishes the specified Event to this bus + */ + public void publish(Event event); + + //#event-bus-api + + } + + static + //#lookup-bus + public class MsgEnvelope { + public final String topic; + public final Object payload; + + public MsgEnvelope(String topic, Object payload) { + this.topic = topic; + this.payload = payload; + } + } + + //#lookup-bus + static + //#lookup-bus + /** + * Publishes the payload of the MsgEnvelope when the topic of the + * MsgEnvelope equals the String specified when subscribing. + */ + public class LookupBusImpl extends LookupEventBus { + + // is used for extracting the classifier from the incoming events + @Override public String classify(MsgEnvelope event) { + return event.topic; + } + + // will be invoked for each event for all subscribers which registered themselves + // for the event’s classifier + @Override public void publish(MsgEnvelope event, ActorRef subscriber) { + subscriber.tell(event.payload, ActorRef.noSender()); + } + + // must define a full order over the subscribers, expressed as expected from + // `java.lang.Comparable.compare` + @Override public int compareSubscribers(ActorRef a, ActorRef b) { + return a.compareTo(b); + } + + // determines the initial size of the index data structure + // used internally (i.e. the expected number of different classifiers) + @Override public int mapSize() { + return 128; + } + + } + //#lookup-bus + + static + //#subchannel-bus + public class StartsWithSubclassification implements Subclassification { + @Override public boolean isEqual(String x, String y) { + return x.equals(y); + } + + @Override public boolean isSubclass(String x, String y) { + return x.startsWith(y); + } + } + + //#subchannel-bus + + static + //#subchannel-bus + /** + * Publishes the payload of the MsgEnvelope when the topic of the + * MsgEnvelope starts with the String specified when subscribing. + */ + public class SubchannelBusImpl extends SubchannelEventBus { + + // Subclassification is an object providing `isEqual` and `isSubclass` + // to be consumed by the other methods of this classifier + @Override public Subclassification subclassification() { + return new StartsWithSubclassification(); + } + + // is used for extracting the classifier from the incoming events + @Override public String classify(MsgEnvelope event) { + return event.topic; + } + + // will be invoked for each event for all subscribers which registered themselves + // for the event’s classifier + @Override public void publish(MsgEnvelope event, ActorRef subscriber) { + subscriber.tell(event.payload, ActorRef.noSender()); + } + + } + //#subchannel-bus + + static + //#scanning-bus + /** + * Publishes String messages with length less than or equal to the length + * specified when subscribing. + */ + public class ScanningBusImpl extends ScanningEventBus { + + // is needed for determining matching classifiers and storing them in an + // ordered collection + @Override public int compareClassifiers(Integer a, Integer b) { + return a.compareTo(b); + } + + // is needed for storing subscribers in an ordered collection + @Override public int compareSubscribers(ActorRef a, ActorRef b) { + return a.compareTo(b); + } + + // determines whether a given classifier shall match a given event; it is invoked + // for each subscription for all received events, hence the name of the classifier + @Override public boolean matches(Integer classifier, String event) { + return event.length() <= classifier; + } + + // will be invoked for each event for all subscribers which registered themselves + // for the event’s classifier + @Override public void publish(String event, ActorRef subscriber) { + subscriber.tell(event, ActorRef.noSender()); + } + + } + //#scanning-bus + + static + //#actor-bus + public class Notification { + public final ActorRef ref; + public final int id; + + public Notification(ActorRef ref, int id) { + this.ref = ref; + this.id = id; + } + } + + //#actor-bus + + static + //#actor-bus + public class ActorBusImpl extends ActorEventBus { + // is used for extracting the classifier from the incoming events + @Override public ActorRef classify(Notification event) { + return event.ref; + } + + // determines the initial size of the index data structure + // used internally (i.e. the expected number of different classifiers) + @Override public int mapSize() { + return 128; + } + + } + //#actor-bus + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("EventBusDocTest"); + + private final ActorSystem system = actorSystemResource.getSystem(); + + @Test + public void demonstrateLookupClassification() { + new JavaTestKit(system) {{ + //#lookup-bus-test + LookupBusImpl lookupBus = new LookupBusImpl(); + lookupBus.subscribe(getTestActor(), "greetings"); + lookupBus.publish(new MsgEnvelope("time", System.currentTimeMillis())); + lookupBus.publish(new MsgEnvelope("greetings", "hello")); + expectMsgEquals("hello"); + //#lookup-bus-test + }}; + } + + @Test + public void demonstrateSubchannelClassification() { + new JavaTestKit(system) {{ + //#subchannel-bus-test + SubchannelBusImpl subchannelBus = new SubchannelBusImpl(); + subchannelBus.subscribe(getTestActor(), "abc"); + subchannelBus.publish(new MsgEnvelope("xyzabc", "x")); + subchannelBus.publish(new MsgEnvelope("bcdef", "b")); + subchannelBus.publish(new MsgEnvelope("abc", "c")); + expectMsgEquals("c"); + subchannelBus.publish(new MsgEnvelope("abcdef", "d")); + expectMsgEquals("d"); + //#subchannel-bus-test + }}; + } + + @Test + public void demonstrateScanningClassification() { + new JavaTestKit(system) {{ + //#scanning-bus-test + ScanningBusImpl scanningBus = new ScanningBusImpl(); + scanningBus.subscribe(getTestActor(), 3); + scanningBus.publish("xyzabc"); + scanningBus.publish("ab"); + expectMsgEquals("ab"); + scanningBus.publish("abc"); + expectMsgEquals("abc"); + //#scanning-bus-test + }}; + } + + @Test + public void demonstrateActorClassification() { + //#actor-bus-test + ActorRef observer1 = new JavaTestKit(system).getRef(); + ActorRef observer2 = new JavaTestKit(system).getRef(); + JavaTestKit probe1 = new JavaTestKit(system); + JavaTestKit probe2 = new JavaTestKit(system); + ActorRef subscriber1 = probe1.getRef(); + ActorRef subscriber2 = probe2.getRef(); + ActorBusImpl actorBus = new ActorBusImpl(); + actorBus.subscribe(subscriber1, observer1); + actorBus.subscribe(subscriber2, observer1); + actorBus.subscribe(subscriber2, observer2); + Notification n1 = new Notification(observer1, 100); + actorBus.publish(n1); + probe1.expectMsgEquals(n1); + probe2.expectMsgEquals(n1); + Notification n2 = new Notification(observer2, 101); + actorBus.publish(n2); + probe2.expectMsgEquals(n2); + probe1.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS)); + //#actor-bus-test + } + +} diff --git a/akka-docs/rst/java/event-bus.rst b/akka-docs/rst/java/event-bus.rst index 08d3930590..a3b7ed5cb9 100644 --- a/akka-docs/rst/java/event-bus.rst +++ b/akka-docs/rst/java/event-bus.rst @@ -1,25 +1,15 @@ .. _event-bus-java: -################ -Event Bus -################ +########### + Event Bus +########### + Originally conceived as a way to send messages to groups of actors, the -:class:`EventBus` has been generalized into a set of composable traits +:class:`EventBus` has been generalized into a set of abstract base classes implementing a simple interface: -- :meth:`public boolean subscribe(S subscriber, C classifier)` subscribes the - given subscriber to events with the given classifier - -- :meth:`public boolean unsubscribe(S subscriber, C classifier)` undoes a - specific subscription - -- :meth:`public void unsubscribe(S subscriber)` undoes all subscriptions for - the given subscriber - -- :meth:`public void publish(E event)` publishes an event, which first is classified - according to the specific bus (see `Classifiers`_) and then published to all - subscribers for the obtained classifier +.. includecode:: code/docs/event/EventBusDocTest.java#event-bus-api .. note:: @@ -27,18 +17,18 @@ implementing a simple interface: published messages. If you need a reference to the original sender you have to provide it inside the message. -This mechanism is used in different places within Akka, e.g. the -:ref:`DeathWatch ` and the `Event Stream`_. Implementations -can make use of the specific building blocks presented below. +This mechanism is used in different places within Akka, e.g. the `Event Stream`_. +Implementations can make use of the specific building blocks presented below. -An event bus must define the following three abstract types: +An event bus must define the following three type parameters: -- :class:`E` is the type of all events published on that bus +- :class:`Event` (E) is the type of all events published on that bus -- :class:`S` is the type of subscribers allowed to register on that event bus +- :class:`Subscriber` (S) is the type of subscribers allowed to register on that + event bus -- :class:`C` defines the classifier to be used in selecting subscribers for - dispatching events +- :class:`Classifier` (C) defines the classifier to be used in selecting + subscribers for dispatching events The traits below are still generic in these types, but they need to be defined for any concrete implementation. @@ -48,33 +38,24 @@ Classifiers The classifiers presented here are part of the Akka distribution, but rolling your own in case you do not find a perfect match is not difficult, check the -implementation of the existing ones on `github`_. - -.. _github: https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/event/EventBus.scala +implementation of the existing ones on `github <@github@/akka-actor/src/main/scala/akka/event/EventBus.scala>`_ Lookup Classification --------------------- The simplest classification is just to extract an arbitrary classifier from each event and maintaining a set of subscribers for each possible classifier. -This can be compared to tuning in on a radio station. The abstract class -:class:`LookupEventBus` is still generic in that it abstracts over how to -compare subscribers and how exactly to classify. The necessary methods to be -implemented are the following: +This can be compared to tuning in on a radio station. The trait +:class:`LookupClassification` is still generic in that it abstracts over how to +compare subscribers and how exactly to classify. -- :meth:`public C classify(E event)` is used for extracting the - classifier from the incoming events. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`public int compareSubscribers(S a, S b)` must define a - partial order over the subscribers, expressed as expected from - :meth:`java.lang.Comparable.compare`. +.. includecode:: code/docs/event/EventBusDocTest.java#lookup-bus -- :meth:`public void publish(E event, S subscriber)` will be invoked for - each event for all subscribers which registered themselves for the event’s - classifier. +A test for this implementation may look like this: -- :meth:`public int mapSize()` determines the initial size of the index data structure - used internally (i.e. the expected number of different classifiers). +.. includecode:: code/docs/event/EventBusDocTest.java#lookup-bus-test This classifier is efficient in case no subscribers exist for a particular event. @@ -87,21 +68,15 @@ can be compared to tuning in on (possibly multiple) radio channels by genre. This classification has been developed for the case where the classifier is just the JVM class of the event and subscribers may be interested in subscribing to all subclasses of a certain class, but it may be used with any -classifier hierarchy. The abstract members needed by this classifier are +classifier hierarchy. -- :meth:`public Subclassification[C] subclassification()` provides an object - providing :meth:`isEqual(a: Classifier, b: Classifier)` and - :meth:`isSubclass(a: Classifier, b: Classifier)` to be consumed by the other - methods of this classifier; this method is called on various occasions, it - should be implemented so that it always returns the same object for - performance reasons. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`public C classify(E event)` is used for extracting the classifier from - the incoming events. +.. includecode:: code/docs/event/EventBusDocTest.java#subchannel-bus -- :meth:`public void publish(E event, S subscriber)` will be invoked for - each event for all subscribers which registered themselves for the event’s - classifier. +A test for this implementation may look like this: + +.. includecode:: code/docs/event/EventBusDocTest.java#subchannel-bus-test This classifier is also efficient in case no subscribers are found for an event, but it uses conventional locking to synchronize an internal classifier @@ -117,21 +92,14 @@ strictly hierarchical, this classifier is useful if there are overlapping classifiers which cover various parts of the event space without forming a hierarchy. It can be compared to tuning in on (possibly multiple) radio stations by geographical reachability (for old-school radio-wave transmission). -The abstract members for this classifier are: -- :meth:`public int compareClassifiers(C a, C b)` is needed for - determining matching classifiers and storing them in an ordered collection. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`public int compareSubscribers(S a, S b)` is needed for - storing subscribers in an ordered collection. +.. includecode:: code/docs/event/EventBusDocTest.java#scanning-bus -- :meth:`public boolean matches(C classifier, E event)` determines - whether a given classifier shall match a given event; it is invoked for each - subscription for all received events, hence the name of the classifier. +A test for this implementation may look like this: -- :meth:`public void publish(E event, S subscriber)` will be invoked for - each event for all subscribers which registered themselves for a classifier - matching this event. +.. includecode:: code/docs/event/EventBusDocTest.java#scanning-bus-test This classifier takes always a time which is proportional to the number of subscriptions, independent of how many actually match. @@ -139,15 +107,17 @@ subscriptions, independent of how many actually match. Actor Classification -------------------- -This classification has been developed specifically for implementing +This classification was originally developed specifically for implementing :ref:`DeathWatch `: subscribers as well as classifiers are of -type :class:`ActorRef`. The abstract members are +type :class:`ActorRef`. -- :meth:`public ActorRef classify(E event)` is used for extracting the - classifier from the incoming events. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`public int mapSize()` determines the initial size of the index data structure - used internally (i.e. the expected number of different classifiers). +.. includecode:: code/docs/event/EventBusDocTest.java#actor-bus + +A test for this implementation may look like this: + +.. includecode:: code/docs/event/EventBusDocTest.java#actor-bus-test This classifier is still is generic in the event type, and it is efficient for all use cases. @@ -161,7 +131,7 @@ The event stream is the main event bus of each actor system: it is used for carrying :ref:`log messages ` and `Dead Letters`_ and may be used by the user code for other purposes as well. It uses `Subchannel Classification`_ which enables registering to related sets of channels (as is -used for :class:`RemoteLifeCycleMessage`). The following example demonstrates +used for :class:`RemotingLifecycleEvent`). The following example demonstrates how a simple subscription works. Given a simple actor: .. includecode:: code/docs/event/LoggingDocTest.java#imports-deadletter diff --git a/akka-docs/rst/project/licenses.rst b/akka-docs/rst/project/licenses.rst index 55e5659a89..7cf6965034 100644 --- a/akka-docs/rst/project/licenses.rst +++ b/akka-docs/rst/project/licenses.rst @@ -36,4 +36,5 @@ Licenses for Dependency Libraries --------------------------------- Each dependency and its license can be seen in the project build file (the comment on the side of each dependency): -``_ +`AkkaBuild.scala <@github@/project/AkkaBuild.scala#L1054>`_ + diff --git a/akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala b/akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala new file mode 100644 index 0000000000..96603fcbec --- /dev/null +++ b/akka-docs/rst/scala/code/docs/event/EventBusDocSpec.scala @@ -0,0 +1,202 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package docs.event + +import scala.concurrent.duration._ +import akka.testkit.AkkaSpec +import akka.actor.ActorRef +import akka.testkit.TestProbe + +object EventBusDocSpec { + + //#lookup-bus + import akka.event.EventBus + import akka.event.LookupClassification + + case class MsgEnvelope(topic: String, payload: Any) + + /** + * Publishes the payload of the MsgEnvelope when the topic of the + * MsgEnvelope equals the String specified when subscribing. + */ + class LookupBusImpl extends EventBus with LookupClassification { + type Event = MsgEnvelope + type Classifier = String + type Subscriber = ActorRef + + // is used for extracting the classifier from the incoming events + override protected def classify(event: Event): Classifier = event.topic + + // will be invoked for each event for all subscribers which registered themselves + // for the event’s classifier + override protected def publish(event: Event, subscriber: Subscriber): Unit = { + subscriber ! event.payload + } + + // must define a full order over the subscribers, expressed as expected from + // `java.lang.Comparable.compare` + override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = + a.compareTo(b) + + // determines the initial size of the index data structure + // used internally (i.e. the expected number of different classifiers) + override protected def mapSize: Int = 128 + + } + + //#lookup-bus + + //#subchannel-bus + import akka.util.Subclassification + + class StartsWithSubclassification extends Subclassification[String] { + override def isEqual(x: String, y: String): Boolean = + x == y + + override def isSubclass(x: String, y: String): Boolean = + x.startsWith(y) + } + + import akka.event.SubchannelClassification + + /** + * Publishes the payload of the MsgEnvelope when the topic of the + * MsgEnvelope starts with the String specified when subscribing. + */ + class SubchannelBusImpl extends EventBus with SubchannelClassification { + type Event = MsgEnvelope + type Classifier = String + type Subscriber = ActorRef + + // Subclassification is an object providing `isEqual` and `isSubclass` + // to be consumed by the other methods of this classifier + override protected val subclassification: Subclassification[Classifier] = + new StartsWithSubclassification + + // is used for extracting the classifier from the incoming events + override protected def classify(event: Event): Classifier = event.topic + + // will be invoked for each event for all subscribers which registered + // themselves for the event’s classifier + override protected def publish(event: Event, subscriber: Subscriber): Unit = { + subscriber ! event.payload + } + } + //#subchannel-bus + + //#scanning-bus + import akka.event.ScanningClassification + + /** + * Publishes String messages with length less than or equal to the length + * specified when subscribing. + */ + class ScanningBusImpl extends EventBus with ScanningClassification { + type Event = String + type Classifier = Int + type Subscriber = ActorRef + + // is needed for determining matching classifiers and storing them in an + // ordered collection + override protected def compareClassifiers(a: Classifier, b: Classifier): Int = + if (a < b) -1 else if (a == b) 0 else 1 + + // is needed for storing subscribers in an ordered collection + override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = + a.compareTo(b) + + // determines whether a given classifier shall match a given event; it is invoked + // for each subscription for all received events, hence the name of the classifier + override protected def matches(classifier: Classifier, event: Event): Boolean = + event.length <= classifier + + // will be invoked for each event for all subscribers which registered themselves + // for a classifier matching this event + override protected def publish(event: Event, subscriber: Subscriber): Unit = { + subscriber ! event + } + } + //#scanning-bus + + //#actor-bus + import akka.event.ActorEventBus + import akka.event.ActorClassification + import akka.event.ActorClassifier + + case class Notification(ref: ActorRef, id: Int) + + class ActorBusImpl extends ActorEventBus with ActorClassifier with ActorClassification { + type Event = Notification + + // is used for extracting the classifier from the incoming events + override protected def classify(event: Event): ActorRef = event.ref + + // determines the initial size of the index data structure + // used internally (i.e. the expected number of different classifiers) + override protected def mapSize: Int = 128 + } + //#actor-bus + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class EventBusDocSpec extends AkkaSpec { + import EventBusDocSpec._ + + "demonstrate LookupClassification" in { + //#lookup-bus-test + val lookupBus = new LookupBusImpl + lookupBus.subscribe(testActor, "greetings") + lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis())) + lookupBus.publish(MsgEnvelope("greetings", "hello")) + expectMsg("hello") + //#lookup-bus-test + } + + "demonstrate SubchannelClassification" in { + //#subchannel-bus-test + val subchannelBus = new SubchannelBusImpl + subchannelBus.subscribe(testActor, "abc") + subchannelBus.publish(MsgEnvelope("xyzabc", "x")) + subchannelBus.publish(MsgEnvelope("bcdef", "b")) + subchannelBus.publish(MsgEnvelope("abc", "c")) + expectMsg("c") + subchannelBus.publish(MsgEnvelope("abcdef", "d")) + expectMsg("d") + //#subchannel-bus-test + } + + "demonstrate ScanningClassification" in { + //#scanning-bus-test + val scanningBus = new ScanningBusImpl + scanningBus.subscribe(testActor, 3) + scanningBus.publish("xyzabc") + scanningBus.publish("ab") + expectMsg("ab") + scanningBus.publish("abc") + expectMsg("abc") + //#scanning-bus-test + } + + "demonstrate ActorClassification" in { + //#actor-bus-test + val observer1 = TestProbe().ref + val observer2 = TestProbe().ref + val probe1 = TestProbe() + val probe2 = TestProbe() + val subscriber1 = probe1.ref + val subscriber2 = probe2.ref + val actorBus = new ActorBusImpl + actorBus.subscribe(subscriber1, observer1) + actorBus.subscribe(subscriber2, observer1) + actorBus.subscribe(subscriber2, observer2) + actorBus.publish(Notification(observer1, 100)) + probe1.expectMsg(Notification(observer1, 100)) + probe2.expectMsg(Notification(observer1, 100)) + actorBus.publish(Notification(observer2, 101)) + probe2.expectMsg(Notification(observer2, 101)) + probe1.expectNoMsg(500.millis) + //#actor-bus-test + } +} diff --git a/akka-docs/rst/scala/event-bus.rst b/akka-docs/rst/scala/event-bus.rst index ee58fc8852..34b4f0e035 100644 --- a/akka-docs/rst/scala/event-bus.rst +++ b/akka-docs/rst/scala/event-bus.rst @@ -1,26 +1,15 @@ .. _event-bus-scala: -################# -Event Bus -################# +########### + Event Bus +########### Originally conceived as a way to send messages to groups of actors, the :class:`EventBus` has been generalized into a set of composable traits implementing a simple interface: -- :meth:`subscribe(subscriber: Subscriber, classifier: Classifier): Boolean` - subscribes the given subscriber to events with the given classifier - -- :meth:`unsubscribe(subscriber: Subscriber, classifier: Classifier): Boolean` - undoes a specific subscription - -- :meth:`unsubscribe(subscriber: Subscriber)` undoes all subscriptions for the - given subscriber - -- :meth:`publish(event: Event)` publishes an event, which first is classified - according to the specific bus (see `Classifiers`_) and then published to all - subscribers for the obtained classifier +.. includecode:: ../../../akka-actor/src/main/scala/akka/event/EventBus.scala#event-bus-api .. note:: @@ -28,9 +17,8 @@ implementing a simple interface: published messages. If you need a reference to the original sender you have to provide it inside the message. -This mechanism is used in different places within Akka, e.g. the -:ref:`DeathWatch ` and the `Event Stream`_. Implementations -can make use of the specific building blocks presented below. +This mechanism is used in different places within Akka, e.g. the `Event Stream`_. +Implementations can make use of the specific building blocks presented below. An event bus must define the following three abstract types: @@ -50,9 +38,7 @@ Classifiers The classifiers presented here are part of the Akka distribution, but rolling your own in case you do not find a perfect match is not difficult, check the -implementation of the existing ones on `github`_. - -.. _github: https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/event/EventBus.scala +implementation of the existing ones on `github <@github@/akka-actor/src/main/scala/akka/event/EventBus.scala>`_ Lookup Classification --------------------- @@ -61,22 +47,15 @@ The simplest classification is just to extract an arbitrary classifier from each event and maintaining a set of subscribers for each possible classifier. This can be compared to tuning in on a radio station. The trait :class:`LookupClassification` is still generic in that it abstracts over how to -compare subscribers and how exactly to classify. The necessary methods to be -implemented are the following: +compare subscribers and how exactly to classify. -- :meth:`classify(event: Event): Classifier` is used for extracting the - classifier from the incoming events. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`compareSubscribers(a: Subscriber, b: Subscriber): Int` must define a - partial order over the subscribers, expressed as expected from - :meth:`java.lang.Comparable.compare`. +.. includecode:: code/docs/event/EventBusDocSpec.scala#lookup-bus -- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for - each event for all subscribers which registered themselves for the event’s - classifier. +A test for this implementation may look like this: -- :meth:`mapSize: Int` determines the initial size of the index data structure - used internally (i.e. the expected number of different classifiers). +.. includecode:: code/docs/event/EventBusDocSpec.scala#lookup-bus-test This classifier is efficient in case no subscribers exist for a particular event. @@ -89,19 +68,15 @@ can be compared to tuning in on (possibly multiple) radio channels by genre. This classification has been developed for the case where the classifier is just the JVM class of the event and subscribers may be interested in subscribing to all subclasses of a certain class, but it may be used with any -classifier hierarchy. The abstract members needed by this classifier are +classifier hierarchy. -- :obj:`subclassification: Subclassification[Classifier]` is an object - providing :meth:`isEqual(a: Classifier, b: Classifier)` and - :meth:`isSubclass(a: Classifier, b: Classifier)` to be consumed by the other - methods of this classifier. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`classify(event: Event): Classifier` is used for extracting the - classifier from the incoming events. +.. includecode:: code/docs/event/EventBusDocSpec.scala#subchannel-bus -- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for - each event for all subscribers which registered themselves for the event’s - classifier. +A test for this implementation may look like this: + +.. includecode:: code/docs/event/EventBusDocSpec.scala#subchannel-bus-test This classifier is also efficient in case no subscribers are found for an event, but it uses conventional locking to synchronize an internal classifier @@ -117,21 +92,14 @@ strictly hierarchical, this classifier is useful if there are overlapping classifiers which cover various parts of the event space without forming a hierarchy. It can be compared to tuning in on (possibly multiple) radio stations by geographical reachability (for old-school radio-wave transmission). -The abstract members for this classifier are: -- :meth:`compareClassifiers(a: Classifier, b: Classifier): Int` is needed for - determining matching classifiers and storing them in an ordered collection. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`compareSubscribers(a: Subscriber, b: Subscriber): Int` is needed for - storing subscribers in an ordered collection. +.. includecode:: code/docs/event/EventBusDocSpec.scala#scanning-bus -- :meth:`matches(classifier: Classifier, event: Event): Boolean` determines - whether a given classifier shall match a given event; it is invoked for each - subscription for all received events, hence the name of the classifier. +A test for this implementation may look like this: -- :meth:`publish(event: Event, subscriber: Subscriber)` will be invoked for - each event for all subscribers which registered themselves for a classifier - matching this event. +.. includecode:: code/docs/event/EventBusDocSpec.scala#scanning-bus-test This classifier takes always a time which is proportional to the number of subscriptions, independent of how many actually match. @@ -139,15 +107,17 @@ subscriptions, independent of how many actually match. Actor Classification -------------------- -This classification has been developed specifically for implementing +This classification was originally developed specifically for implementing :ref:`DeathWatch `: subscribers as well as classifiers are of -type :class:`ActorRef`. The abstract members are +type :class:`ActorRef`. -- :meth:`classify(event: Event): ActorRef` is used for extracting the - classifier from the incoming events. +The necessary methods to be implemented are illustrated with the following example: -- :meth:`mapSize: Int` determines the initial size of the index data structure - used internally (i.e. the expected number of different classifiers). +.. includecode:: code/docs/event/EventBusDocSpec.scala#actor-bus + +A test for this implementation may look like this: + +.. includecode:: code/docs/event/EventBusDocSpec.scala#actor-bus-test This classifier is still is generic in the event type, and it is efficient for all use cases. @@ -161,7 +131,7 @@ The event stream is the main event bus of each actor system: it is used for carrying :ref:`log messages ` and `Dead Letters`_ and may be used by the user code for other purposes as well. It uses `Subchannel Classification`_ which enables registering to related sets of channels (as is -used for :class:`RemoteLifeCycleMessage`). The following example demonstrates +used for :class:`RemotingLifecycleEvent`). The following example demonstrates how a simple subscription works: .. includecode:: code/docs/event/LoggingDocSpec.scala#deadletters diff --git a/akka-docs/rst/scala/fsm.rst b/akka-docs/rst/scala/fsm.rst index 5932e838d1..990528d791 100644 --- a/akka-docs/rst/scala/fsm.rst +++ b/akka-docs/rst/scala/fsm.rst @@ -481,5 +481,5 @@ Examples A bigger FSM example contrasted with Actor's :meth:`become`/:meth:`unbecome` can be found in the sources: - * `Dining Hakkers using FSM `_ - * `Dining Hakkers using become `_ + * `Dining Hakkers using FSM <@github@/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala>`_ + * `Dining Hakkers using become <@github@/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala>`_ diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala index f844403951..e1ec1450ac 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala @@ -27,8 +27,8 @@ class RemoteConsistentHashingRouterSpec extends AkkaSpec(""" val consistentHash1 = ConsistentHash(nodes1, 10) val consistentHash2 = ConsistentHash(nodes2, 10) val keys = List("A", "B", "C", "D", "E", "F", "G") - val result1 = keys collect { case k => consistentHash1.nodeFor(k).routee } - val result2 = keys collect { case k => consistentHash2.nodeFor(k).routee } + val result1 = keys collect { case k ⇒ consistentHash1.nodeFor(k).routee } + val result2 = keys collect { case k ⇒ consistentHash2.nodeFor(k).routee } result1 should be(result2) }