+doc #17877 document filtering for subclasses in eventStream
This commit is contained in:
parent
70403446ce
commit
0e8dd3a7f4
4 changed files with 102 additions and 17 deletions
|
|
@ -4,10 +4,7 @@
|
||||||
package docs.event;
|
package docs.event;
|
||||||
|
|
||||||
//#imports
|
//#imports
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.*;
|
||||||
import akka.actor.ActorSystem;
|
|
||||||
import akka.actor.AllDeadLetters;
|
|
||||||
import akka.actor.SuppressedDeadLetter;
|
|
||||||
import akka.event.Logging;
|
import akka.event.Logging;
|
||||||
import akka.event.LoggingAdapter;
|
import akka.event.LoggingAdapter;
|
||||||
|
|
||||||
|
|
@ -34,15 +31,12 @@ import java.util.Map;
|
||||||
//#imports-mdc
|
//#imports-mdc
|
||||||
|
|
||||||
//#imports-deadletter
|
//#imports-deadletter
|
||||||
import akka.actor.Props;
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
import akka.actor.UntypedActor;
|
|
||||||
import akka.actor.DeadLetter;
|
|
||||||
//#imports-deadletter
|
//#imports-deadletter
|
||||||
|
|
||||||
public class LoggingDocTest {
|
public class LoggingDocTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void useLoggingActor() {
|
public void useLoggingActor() {
|
||||||
ActorSystem system = ActorSystem.create("MySystem");
|
ActorSystem system = ActorSystem.create("MySystem");
|
||||||
|
|
@ -69,11 +63,61 @@ public class LoggingDocTest {
|
||||||
JavaTestKit.shutdownActorSystem(system);
|
JavaTestKit.shutdownActorSystem(system);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#superclass-subscription-eventstream
|
||||||
|
interface AllKindsOfMusic { }
|
||||||
|
|
||||||
|
class Jazz implements AllKindsOfMusic {
|
||||||
|
final public String artist;
|
||||||
|
public Jazz(String artist) {
|
||||||
|
this.artist = artist;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
class Electronic implements AllKindsOfMusic {
|
||||||
|
final public String artist;
|
||||||
|
public Electronic(String artist) {
|
||||||
|
this.artist = artist;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Listener extends UntypedActor {
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object message) throws Exception {
|
||||||
|
if (message instanceof Jazz) {
|
||||||
|
System.out.printf("%s is listening to: %s%n", self().path().name(), message);
|
||||||
|
} else if (message instanceof Electronic) {
|
||||||
|
System.out.printf("%s is listening to: %s%n", self().path().name(), message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#superclass-subscription-eventstream
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void subscribeBySubclassification() {
|
||||||
|
final ActorSystem system = ActorSystem.create("DeadLetters");
|
||||||
|
//#superclass-subscription-eventstream
|
||||||
|
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
|
||||||
|
system.eventStream().subscribe(actor, DeadLetter.class);
|
||||||
|
|
||||||
|
final ActorRef jazzListener = system.actorOf(Props.create(Listener.class));
|
||||||
|
final ActorRef musicListener = system.actorOf(Props.create(Listener.class));
|
||||||
|
system.eventStream().subscribe(jazzListener, Jazz.class);
|
||||||
|
system.eventStream().subscribe(musicListener, AllKindsOfMusic.class);
|
||||||
|
|
||||||
|
// only musicListener gets this message, since it listens to *all* kinds of music:
|
||||||
|
system.eventStream().publish(new Electronic("Parov Stelar"));
|
||||||
|
|
||||||
|
// jazzListener and musicListener will be notified about Jazz:
|
||||||
|
system.eventStream().publish(new Jazz("Sonny Rollins"));
|
||||||
|
|
||||||
|
//#superclass-subscription-eventstream
|
||||||
|
JavaTestKit.shutdownActorSystem(system);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void subscribeToSuppressedDeadLetters() {
|
public void subscribeToSuppressedDeadLetters() {
|
||||||
final ActorSystem system = ActorSystem.create("SuppressedDeadLetters");
|
final ActorSystem system = ActorSystem.create("SuppressedDeadLetters");
|
||||||
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
|
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
|
||||||
|
|
||||||
//#suppressed-deadletters
|
//#suppressed-deadletters
|
||||||
system.eventStream().subscribe(actor, SuppressedDeadLetter.class);
|
system.eventStream().subscribe(actor, SuppressedDeadLetter.class);
|
||||||
//#suppressed-deadletters
|
//#suppressed-deadletters
|
||||||
|
|
|
||||||
|
|
@ -148,6 +148,12 @@ it can be subscribed like this:
|
||||||
|
|
||||||
.. includecode:: code/docs/event/LoggingDocTest.java#deadletters
|
.. includecode:: code/docs/event/LoggingDocTest.java#deadletters
|
||||||
|
|
||||||
|
It is also worth pointing out that thanks to the way the subchannel classification
|
||||||
|
is implemented in the event stream, it is possible to subscribe to a group of events, by
|
||||||
|
subscribing to their common superclass as demonstrated in the following example:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/event/LoggingDocTest.java#superclass-subscription-eventstream
|
||||||
|
|
||||||
Similarly to `Actor Classification`_, :class:`EventStream` will automatically remove subscribers when they terminate.
|
Similarly to `Actor Classification`_, :class:`EventStream` will automatically remove subscribers when they terminate.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,8 @@
|
||||||
*/
|
*/
|
||||||
package docs.event
|
package docs.event
|
||||||
|
|
||||||
import akka.actor.AllDeadLetters
|
import akka.actor.{ Actor, Props }
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.actor.Actor
|
|
||||||
import akka.actor.Props
|
|
||||||
|
|
||||||
object LoggingDocSpec {
|
object LoggingDocSpec {
|
||||||
|
|
||||||
|
|
@ -77,12 +75,12 @@ object LoggingDocSpec {
|
||||||
//#mdc-actor
|
//#mdc-actor
|
||||||
|
|
||||||
//#my-event-listener
|
//#my-event-listener
|
||||||
|
import akka.event.Logging.Debug
|
||||||
|
import akka.event.Logging.Error
|
||||||
|
import akka.event.Logging.Info
|
||||||
import akka.event.Logging.InitializeLogger
|
import akka.event.Logging.InitializeLogger
|
||||||
import akka.event.Logging.LoggerInitialized
|
import akka.event.Logging.LoggerInitialized
|
||||||
import akka.event.Logging.Error
|
|
||||||
import akka.event.Logging.Warning
|
import akka.event.Logging.Warning
|
||||||
import akka.event.Logging.Info
|
|
||||||
import akka.event.Logging.Debug
|
|
||||||
|
|
||||||
class MyEventListener extends Actor {
|
class MyEventListener extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
@ -96,8 +94,8 @@ object LoggingDocSpec {
|
||||||
//#my-event-listener
|
//#my-event-listener
|
||||||
|
|
||||||
//#my-source
|
//#my-source
|
||||||
import akka.event.LogSource
|
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
import akka.event.LogSource
|
||||||
|
|
||||||
object MyType {
|
object MyType {
|
||||||
implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
|
implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
|
||||||
|
|
@ -113,11 +111,12 @@ object LoggingDocSpec {
|
||||||
val log = Logging(system, this)
|
val log = Logging(system, this)
|
||||||
}
|
}
|
||||||
//#my-source
|
//#my-source
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class LoggingDocSpec extends AkkaSpec {
|
class LoggingDocSpec extends AkkaSpec {
|
||||||
|
|
||||||
import LoggingDocSpec.{ MyActor, MdcActor, MdcActorMixin, Req }
|
import LoggingDocSpec.{ MdcActor, MdcActorMixin, MyActor, Req }
|
||||||
|
|
||||||
"use a logging actor" in {
|
"use a logging actor" in {
|
||||||
val myActor = system.actorOf(Props[MyActor])
|
val myActor = system.actorOf(Props[MyActor])
|
||||||
|
|
@ -144,12 +143,42 @@ class LoggingDocSpec extends AkkaSpec {
|
||||||
case d: DeadLetter => println(d)
|
case d: DeadLetter => println(d)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val listener = system.actorOf(Props(classOf[Listener], this))
|
val listener = system.actorOf(Props(classOf[Listener], this))
|
||||||
system.eventStream.subscribe(listener, classOf[DeadLetter])
|
system.eventStream.subscribe(listener, classOf[DeadLetter])
|
||||||
//#deadletters
|
//#deadletters
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"demonstrate superclass subscriptions on eventStream" in {
|
||||||
|
def println(s: String) = ()
|
||||||
|
//#superclass-subscription-eventstream
|
||||||
|
abstract class AllKindsOfMusic { def artist: String }
|
||||||
|
case class Jazz(artist: String) extends AllKindsOfMusic
|
||||||
|
case class Electronic(artist: String) extends AllKindsOfMusic
|
||||||
|
|
||||||
|
new AnyRef {
|
||||||
|
class Listener extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case m: Jazz => println(s"${self.path.name} is listening to: ${m.artist}")
|
||||||
|
case m: Electronic => println(s"${self.path.name} is listening to: ${m.artist}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val jazzListener = system.actorOf(Props(classOf[Listener], this))
|
||||||
|
val musicListener = system.actorOf(Props(classOf[Listener], this))
|
||||||
|
system.eventStream.subscribe(jazzListener, classOf[Jazz])
|
||||||
|
system.eventStream.subscribe(musicListener, classOf[AllKindsOfMusic])
|
||||||
|
|
||||||
|
// only musicListener gets this message, since it listens to *all* kinds of music:
|
||||||
|
system.eventStream.publish(Electronic("Parov Stelar"))
|
||||||
|
|
||||||
|
// jazzListener and musicListener will be notified about Jazz:
|
||||||
|
system.eventStream.publish(Jazz("Sonny Rollins"))
|
||||||
|
//#superclass-subscription-eventstream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"allow registration to suppressed dead letters" in {
|
"allow registration to suppressed dead letters" in {
|
||||||
new AnyRef {
|
new AnyRef {
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
|
|
|
||||||
|
|
@ -143,6 +143,12 @@ how a simple subscription works:
|
||||||
|
|
||||||
.. includecode:: code/docs/event/LoggingDocSpec.scala#deadletters
|
.. includecode:: code/docs/event/LoggingDocSpec.scala#deadletters
|
||||||
|
|
||||||
|
It is also worth pointing out that thanks to the way the subchannel classification
|
||||||
|
is implemented in the event stream, it is possible to subscribe to a group of events, by
|
||||||
|
subscribing to their common superclass as demonstrated in the following example:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/event/LoggingDocSpec.scala#superclass-subscription-eventstream
|
||||||
|
|
||||||
Similarly to `Actor Classification`_, :class:`EventStream` will automatically remove subscribers when they terminate.
|
Similarly to `Actor Classification`_, :class:`EventStream` will automatically remove subscribers when they terminate.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue