From a7c3e2b0145b691860842baaec7ce18134e1c255 Mon Sep 17 00:00:00 2001 From: Vasilis Nicolaou Date: Mon, 20 May 2019 11:22:11 +0100 Subject: [PATCH] Wrap the system event stream with a typed ActorRef (#26810) --- .../typed/eventstream/EventStreamTest.java | 18 ++++ .../typed/eventstream/EventStreamSpec.scala | 77 +++++++++++++++++ .../scala/akka/actor/typed/ActorSystem.scala | 30 ++++--- .../actor/typed/eventstream/EventStream.scala | 85 +++++++++++++++++++ .../typed/internal/EventStreamExtension.scala | 27 ++++++ .../internal/adapter/EventStreamAdapter.scala | 38 +++++++++ 6 files changed, 261 insertions(+), 14 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/EventStreamTest.java create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/eventstream/EventStream.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/EventStreamExtension.scala create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/EventStreamAdapter.scala diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/EventStreamTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/EventStreamTest.java new file mode 100644 index 0000000000..23a5532284 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/EventStreamTest.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.eventstream; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; + +public class EventStreamTest { + + static class SomeClass {} + + public static void compileOnlyTest(ActorSystem actorSystem, ActorRef actorRef) { + actorSystem.eventStream().tell(Subscribe.of(SomeClass.class, actorRef)); + actorSystem.eventStream().tell(new Subscribe(SomeClass.class, actorRef)); + } +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala new file mode 100644 index 0000000000..21d468f600 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.eventstream + +import scala.concurrent.duration._ + +import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } +import org.scalatest.WordSpecLike + +class EventStreamSpec extends ScalaTestWithActorTestKit with WordSpecLike { + + import EventStreamSpec._ + + private final val ShortWait = 100 milli + + "system event stream".can { + val eventObjListener: TestProbe[EventObj.type] = testKit.createTestProbe() + val eventClassListener: TestProbe[EventClass] = testKit.createTestProbe() + + "register subscribers" in { + testKit.system.eventStream ! Subscribe(eventObjListener.ref) + testKit.system.eventStream ! Subscribe(eventClassListener.ref) + } + + "accept published events" in { + testKit.system.eventStream ! Publish(EventObj) + } + "dispatch events to subscribers of that type" in { + eventObjListener.expectMessage(EventObj) + eventClassListener.expectNoMessage(ShortWait) + testKit.system.eventStream ! Publish(EventClass()) + eventClassListener.expectMessage(EventClass()) + eventObjListener.expectNoMessage(ShortWait) + } + + "unsubscribe subscribers" in { + testKit.system.eventStream ! Unsubscribe(eventObjListener.ref) + testKit.system.eventStream ! Publish(EventObj) + eventObjListener.expectNoMessage(ShortWait) + } + } + + "a system event stream subscriber" must { + val rootEventListener = testKit.createTestProbe[Root] + val level1EventListener = testKit.createTestProbe[Level1] + val rootEventListenerForLevel1 = testKit.createTestProbe[Root] + testKit.system.eventStream ! Subscribe(rootEventListener.ref) + testKit.system.eventStream ! Subscribe(level1EventListener.ref) + testKit.system.eventStream ! Subscribe[Level1](rootEventListenerForLevel1.ref) + "listen for all subclasses of the events" in { + testKit.system.eventStream ! Publish(Depth1()) + rootEventListener.expectMessage(Depth1()) + level1EventListener.expectNoMessage(ShortWait) + rootEventListenerForLevel1.expectNoMessage(ShortWait) + + testKit.system.eventStream ! Publish(Depth2()) + + rootEventListener.expectMessage(Depth2()) + level1EventListener.expectMessage(Depth2()) + rootEventListenerForLevel1.expectMessage(Depth2()) + } + + } + +} + +object EventStreamSpec { + case object EventObj + case class EventClass() + + sealed trait Root + case class Depth1() extends Root + sealed trait Level1 extends Root + case class Depth2() extends Level1 +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 93e06100a8..784ae1a9d9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -4,26 +4,20 @@ package akka.actor.typed -import java.util.concurrent.CompletionStage -import java.util.concurrent.ThreadFactory +import java.util.concurrent.{ CompletionStage, ThreadFactory } -import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.Future -import akka.Done -import akka.{ actor => untyped } import akka.actor.BootstrapSetup import akka.actor.setup.ActorSystemSetup -import akka.actor.typed.internal.InternalRecipientRef -import akka.actor.typed.internal.adapter.ActorSystemAdapter -import akka.actor.typed.internal.adapter.GuardianStartupBehavior -import akka.actor.typed.internal.adapter.PropsAdapter +import akka.actor.typed.internal.{ EventStreamExtension, InternalRecipientRef } +import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, GuardianStartupBehavior, PropsAdapter } import akka.actor.typed.receptionist.Receptionist -import akka.annotation.ApiMayChange -import akka.annotation.DoNotInherit +import akka.annotation.{ ApiMayChange, DoNotInherit } import akka.util.Helpers.Requiring import akka.util.Timeout -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory +import akka.{ Done, actor => untyped } +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.concurrent.{ ExecutionContextExecutor, Future } /** * An ActorSystem is home to a hierarchy of Actors. It is created using @@ -164,6 +158,14 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter */ def receptionist: ActorRef[Receptionist.Command] = Receptionist(this).ref + + /** + * Main event bus of this actor system, used for example for logging. + * Accepts [[akka.actor.typed.eventstream.Command]]. + */ + def eventStream: ActorRef[eventstream.Command] = + EventStreamExtension(this).ref + } object ActorSystem { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/eventstream/EventStream.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/eventstream/EventStream.scala new file mode 100644 index 0000000000..6198ef16eb --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/eventstream/EventStream.scala @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.eventstream + +import akka.actor.typed.ActorRef +import akka.annotation.{ DoNotInherit, InternalApi } + +import scala.reflect.ClassTag + +/** + * Not for user Extension + */ +@DoNotInherit sealed trait Command + +/** + * Publish an event of type E + * @param event + * @tparam E + */ +final case class Publish[E](event: E) extends Command + +object Publish { + + /** + * Java API. + */ + def of[E](event: E): Publish[E] = apply(event) +} + +/** + * Subscribe a typed actor to listen for types or subtypes of E. + * ==Simple example== + * {{{ + * sealed trait A + * case object A1 extends A + * //listen for all As + * def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) = + * actorSystem.eventStream ! Subscribe(actorRef) + * //listen for A1s only + * def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) = + * actorSystem.eventStream ! Subscribe[A1](actorRef) + * }}} + * + * @param subscriber + * @param classTag + * @tparam E + */ +final case class Subscribe[E](subscriber: ActorRef[E])(implicit classTag: ClassTag[E]) extends Command { + + /** + * Java API. + */ + def this(clazz: Class[E], subscriber: ActorRef[E]) = this(subscriber)(ClassTag(clazz)) + + /** + * INTERNAL API + */ + @InternalApi private[akka] def topic: Class[_] = classTag.runtimeClass +} + +object Subscribe { + + /** + * Java API. + */ + def of[E](clazz: java.lang.Class[E], subscriber: ActorRef[E]): Subscribe[E] = + Subscribe(subscriber)(ClassTag(clazz)) +} + +/** + * Unsubscribe an actor ref from the event stream + * @param subscriber + * @tparam E + */ +final case class Unsubscribe[E](subscriber: ActorRef[E]) extends Command + +object Unsubscribe { + + /** + * Java API. + */ + def of[E](subscriber: ActorRef[E]): Unsubscribe[E] = apply(subscriber) +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/EventStreamExtension.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/EventStreamExtension.scala new file mode 100644 index 0000000000..c1f93423b4 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/EventStreamExtension.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal + +import akka.actor.typed.eventstream.Command +import akka.actor.typed.internal.adapter.EventStreamAdapter +import akka.actor.typed._ +import akka.actor.typed.scaladsl.adapter._ +import akka.annotation.InternalApi + +/** + * INTERNAL API + * + * Exposes a typed actor that interacts with the [[akka.actor.ActorSystem.eventStream]]. + * + * It is used as an extension to ensure a single instance per actor system. + */ +@InternalApi private[akka] final class EventStreamExtension(actorSystem: ActorSystem[_]) extends Extension { + val ref: ActorRef[Command] = + actorSystem.internalSystemActorOf(EventStreamAdapter.behavior, "eventstream", Props.empty) +} + +private[akka] object EventStreamExtension extends ExtensionId[EventStreamExtension] { + override def createExtension(system: ActorSystem[_]): EventStreamExtension = new EventStreamExtension(system) +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/EventStreamAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/EventStreamAdapter.scala new file mode 100644 index 0000000000..539bda004d --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/EventStreamAdapter.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.adapter + +import akka.actor.typed.Behavior +import akka.actor.typed.eventstream._ +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ +import akka.annotation.InternalApi + +/** + * INTERNAL API + * Encapsulates the [[akka.actor.ActorSystem.eventStream]] in a [[Behavior]] + */ +@InternalApi private[akka] object EventStreamAdapter { + + private[akka] val behavior: Behavior[Command] = + Behaviors.setup { ctx => + val eventStream = ctx.system.toUntyped.eventStream + eventStreamBehavior(eventStream) + } + + private def eventStreamBehavior(eventStream: akka.event.EventStream): Behavior[Command] = + Behaviors.receiveMessage { + case Publish(event) => + eventStream.publish(event) + Behaviors.same + case s @ Subscribe(subscriber) => + eventStream.subscribe(subscriber.toUntyped, s.topic) + Behaviors.same + case Unsubscribe(subscriber) => + eventStream.unsubscribe(subscriber.toUntyped) + Behaviors.same + } + +}