diff --git a/akka-docs/rst/java/code/docs/event/LoggingDocTest.java b/akka-docs/rst/java/code/docs/event/LoggingDocTest.java index 7511faaff5..c1f5bed2c4 100644 --- a/akka-docs/rst/java/code/docs/event/LoggingDocTest.java +++ b/akka-docs/rst/java/code/docs/event/LoggingDocTest.java @@ -4,10 +4,7 @@ package docs.event; //#imports -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.AllDeadLetters; -import akka.actor.SuppressedDeadLetter; +import akka.actor.*; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -34,15 +31,12 @@ import java.util.Map; //#imports-mdc //#imports-deadletter -import akka.actor.Props; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.UntypedActor; -import akka.actor.DeadLetter; //#imports-deadletter public class LoggingDocTest { - + @Test public void useLoggingActor() { ActorSystem system = ActorSystem.create("MySystem"); @@ -69,11 +63,61 @@ public class LoggingDocTest { JavaTestKit.shutdownActorSystem(system); } + //#superclass-subscription-eventstream + interface AllKindsOfMusic { } + + class Jazz implements AllKindsOfMusic { + final public String artist; + public Jazz(String artist) { + this.artist = artist; + } + } + class Electronic implements AllKindsOfMusic { + final public String artist; + public Electronic(String artist) { + this.artist = artist; + } + } + + class Listener extends UntypedActor { + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof Jazz) { + System.out.printf("%s is listening to: %s%n", self().path().name(), message); + } else if (message instanceof Electronic) { + System.out.printf("%s is listening to: %s%n", self().path().name(), message); + } + } + } + //#superclass-subscription-eventstream + + @Test + public void subscribeBySubclassification() { + final ActorSystem system = ActorSystem.create("DeadLetters"); + //#superclass-subscription-eventstream + final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class)); + system.eventStream().subscribe(actor, DeadLetter.class); + + final ActorRef jazzListener = system.actorOf(Props.create(Listener.class)); + final ActorRef musicListener = system.actorOf(Props.create(Listener.class)); + system.eventStream().subscribe(jazzListener, Jazz.class); + system.eventStream().subscribe(musicListener, AllKindsOfMusic.class); + + // only musicListener gets this message, since it listens to *all* kinds of music: + system.eventStream().publish(new Electronic("Parov Stelar")); + + // jazzListener and musicListener will be notified about Jazz: + system.eventStream().publish(new Jazz("Sonny Rollins")); + + //#superclass-subscription-eventstream + JavaTestKit.shutdownActorSystem(system); + } + @Test public void subscribeToSuppressedDeadLetters() { final ActorSystem system = ActorSystem.create("SuppressedDeadLetters"); final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class)); - + //#suppressed-deadletters system.eventStream().subscribe(actor, SuppressedDeadLetter.class); //#suppressed-deadletters diff --git a/akka-docs/rst/java/event-bus.rst b/akka-docs/rst/java/event-bus.rst index cd7b09eb54..30c2751f6a 100644 --- a/akka-docs/rst/java/event-bus.rst +++ b/akka-docs/rst/java/event-bus.rst @@ -148,6 +148,12 @@ it can be subscribed like this: .. includecode:: code/docs/event/LoggingDocTest.java#deadletters +It is also worth pointing out that thanks to the way the subchannel classification +is implemented in the event stream, it is possible to subscribe to a group of events, by +subscribing to their common superclass as demonstrated in the following example: + +.. includecode:: code/docs/event/LoggingDocTest.java#superclass-subscription-eventstream + Similarly to `Actor Classification`_, :class:`EventStream` will automatically remove subscribers when they terminate. .. note:: diff --git a/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala b/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala index 7211a64308..d60e757d80 100644 --- a/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala @@ -3,10 +3,8 @@ */ package docs.event -import akka.actor.AllDeadLetters +import akka.actor.{ Actor, Props } import akka.testkit.AkkaSpec -import akka.actor.Actor -import akka.actor.Props object LoggingDocSpec { @@ -77,12 +75,12 @@ object LoggingDocSpec { //#mdc-actor //#my-event-listener + import akka.event.Logging.Debug + import akka.event.Logging.Error + import akka.event.Logging.Info import akka.event.Logging.InitializeLogger import akka.event.Logging.LoggerInitialized - import akka.event.Logging.Error import akka.event.Logging.Warning - import akka.event.Logging.Info - import akka.event.Logging.Debug class MyEventListener extends Actor { def receive = { @@ -96,8 +94,8 @@ object LoggingDocSpec { //#my-event-listener //#my-source - import akka.event.LogSource import akka.actor.ActorSystem + import akka.event.LogSource object MyType { implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { @@ -113,11 +111,12 @@ object LoggingDocSpec { val log = Logging(system, this) } //#my-source + } class LoggingDocSpec extends AkkaSpec { - import LoggingDocSpec.{ MyActor, MdcActor, MdcActorMixin, Req } + import LoggingDocSpec.{ MdcActor, MdcActorMixin, MyActor, Req } "use a logging actor" in { val myActor = system.actorOf(Props[MyActor]) @@ -144,12 +143,42 @@ class LoggingDocSpec extends AkkaSpec { case d: DeadLetter => println(d) } } + val listener = system.actorOf(Props(classOf[Listener], this)) system.eventStream.subscribe(listener, classOf[DeadLetter]) //#deadletters } } + "demonstrate superclass subscriptions on eventStream" in { + def println(s: String) = () + //#superclass-subscription-eventstream + abstract class AllKindsOfMusic { def artist: String } + case class Jazz(artist: String) extends AllKindsOfMusic + case class Electronic(artist: String) extends AllKindsOfMusic + + new AnyRef { + class Listener extends Actor { + def receive = { + case m: Jazz => println(s"${self.path.name} is listening to: ${m.artist}") + case m: Electronic => println(s"${self.path.name} is listening to: ${m.artist}") + } + } + + val jazzListener = system.actorOf(Props(classOf[Listener], this)) + val musicListener = system.actorOf(Props(classOf[Listener], this)) + system.eventStream.subscribe(jazzListener, classOf[Jazz]) + system.eventStream.subscribe(musicListener, classOf[AllKindsOfMusic]) + + // only musicListener gets this message, since it listens to *all* kinds of music: + system.eventStream.publish(Electronic("Parov Stelar")) + + // jazzListener and musicListener will be notified about Jazz: + system.eventStream.publish(Jazz("Sonny Rollins")) + //#superclass-subscription-eventstream + } + } + "allow registration to suppressed dead letters" in { new AnyRef { import akka.actor.Props diff --git a/akka-docs/rst/scala/event-bus.rst b/akka-docs/rst/scala/event-bus.rst index 1d5aa6f71f..780504f681 100644 --- a/akka-docs/rst/scala/event-bus.rst +++ b/akka-docs/rst/scala/event-bus.rst @@ -143,6 +143,12 @@ how a simple subscription works: .. includecode:: code/docs/event/LoggingDocSpec.scala#deadletters +It is also worth pointing out that thanks to the way the subchannel classification +is implemented in the event stream, it is possible to subscribe to a group of events, by +subscribing to their common superclass as demonstrated in the following example: + +.. includecode:: code/docs/event/LoggingDocSpec.scala#superclass-subscription-eventstream + Similarly to `Actor Classification`_, :class:`EventStream` will automatically remove subscribers when they terminate. .. note::