Wrap the system event stream with a typed ActorRef (#26810)

This commit is contained in:
Vasilis Nicolaou 2019-05-20 11:22:11 +01:00 committed by Patrik Nordwall
parent f37f41574d
commit a7c3e2b014
6 changed files with 261 additions and 14 deletions

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.eventstream;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
public class EventStreamTest {
static class SomeClass {}
public static void compileOnlyTest(ActorSystem<?> actorSystem, ActorRef<SomeClass> actorRef) {
actorSystem.eventStream().tell(Subscribe.of(SomeClass.class, actorRef));
actorSystem.eventStream().tell(new Subscribe(SomeClass.class, actorRef));
}
}

View file

@ -0,0 +1,77 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.eventstream
import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import org.scalatest.WordSpecLike
class EventStreamSpec extends ScalaTestWithActorTestKit with WordSpecLike {
import EventStreamSpec._
private final val ShortWait = 100 milli
"system event stream".can {
val eventObjListener: TestProbe[EventObj.type] = testKit.createTestProbe()
val eventClassListener: TestProbe[EventClass] = testKit.createTestProbe()
"register subscribers" in {
testKit.system.eventStream ! Subscribe(eventObjListener.ref)
testKit.system.eventStream ! Subscribe(eventClassListener.ref)
}
"accept published events" in {
testKit.system.eventStream ! Publish(EventObj)
}
"dispatch events to subscribers of that type" in {
eventObjListener.expectMessage(EventObj)
eventClassListener.expectNoMessage(ShortWait)
testKit.system.eventStream ! Publish(EventClass())
eventClassListener.expectMessage(EventClass())
eventObjListener.expectNoMessage(ShortWait)
}
"unsubscribe subscribers" in {
testKit.system.eventStream ! Unsubscribe(eventObjListener.ref)
testKit.system.eventStream ! Publish(EventObj)
eventObjListener.expectNoMessage(ShortWait)
}
}
"a system event stream subscriber" must {
val rootEventListener = testKit.createTestProbe[Root]
val level1EventListener = testKit.createTestProbe[Level1]
val rootEventListenerForLevel1 = testKit.createTestProbe[Root]
testKit.system.eventStream ! Subscribe(rootEventListener.ref)
testKit.system.eventStream ! Subscribe(level1EventListener.ref)
testKit.system.eventStream ! Subscribe[Level1](rootEventListenerForLevel1.ref)
"listen for all subclasses of the events" in {
testKit.system.eventStream ! Publish(Depth1())
rootEventListener.expectMessage(Depth1())
level1EventListener.expectNoMessage(ShortWait)
rootEventListenerForLevel1.expectNoMessage(ShortWait)
testKit.system.eventStream ! Publish(Depth2())
rootEventListener.expectMessage(Depth2())
level1EventListener.expectMessage(Depth2())
rootEventListenerForLevel1.expectMessage(Depth2())
}
}
}
object EventStreamSpec {
case object EventObj
case class EventClass()
sealed trait Root
case class Depth1() extends Root
sealed trait Level1 extends Root
case class Depth2() extends Level1
}

View file

@ -4,26 +4,20 @@
package akka.actor.typed
import java.util.concurrent.CompletionStage
import java.util.concurrent.ThreadFactory
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import akka.Done
import akka.{ actor => untyped }
import akka.actor.BootstrapSetup
import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.internal.adapter.GuardianStartupBehavior
import akka.actor.typed.internal.adapter.PropsAdapter
import akka.actor.typed.internal.{ EventStreamExtension, InternalRecipientRef }
import akka.actor.typed.internal.adapter.{ ActorSystemAdapter, GuardianStartupBehavior, PropsAdapter }
import akka.actor.typed.receptionist.Receptionist
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.util.Helpers.Requiring
import akka.util.Timeout
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.{ Done, actor => untyped }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.{ ExecutionContextExecutor, Future }
/**
* An ActorSystem is home to a hierarchy of Actors. It is created using
@ -164,6 +158,14 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter
*/
def receptionist: ActorRef[Receptionist.Command] =
Receptionist(this).ref
/**
* Main event bus of this actor system, used for example for logging.
* Accepts [[akka.actor.typed.eventstream.Command]].
*/
def eventStream: ActorRef[eventstream.Command] =
EventStreamExtension(this).ref
}
object ActorSystem {

View file

@ -0,0 +1,85 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.eventstream
import akka.actor.typed.ActorRef
import akka.annotation.{ DoNotInherit, InternalApi }
import scala.reflect.ClassTag
/**
* Not for user Extension
*/
@DoNotInherit sealed trait Command
/**
* Publish an event of type E
* @param event
* @tparam E
*/
final case class Publish[E](event: E) extends Command
object Publish {
/**
* Java API.
*/
def of[E](event: E): Publish[E] = apply(event)
}
/**
* Subscribe a typed actor to listen for types or subtypes of E.
* ==Simple example==
* {{{
* sealed trait A
* case object A1 extends A
* //listen for all As
* def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) =
* actorSystem.eventStream ! Subscribe(actorRef)
* //listen for A1s only
* def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) =
* actorSystem.eventStream ! Subscribe[A1](actorRef)
* }}}
*
* @param subscriber
* @param classTag
* @tparam E
*/
final case class Subscribe[E](subscriber: ActorRef[E])(implicit classTag: ClassTag[E]) extends Command {
/**
* Java API.
*/
def this(clazz: Class[E], subscriber: ActorRef[E]) = this(subscriber)(ClassTag(clazz))
/**
* INTERNAL API
*/
@InternalApi private[akka] def topic: Class[_] = classTag.runtimeClass
}
object Subscribe {
/**
* Java API.
*/
def of[E](clazz: java.lang.Class[E], subscriber: ActorRef[E]): Subscribe[E] =
Subscribe(subscriber)(ClassTag(clazz))
}
/**
* Unsubscribe an actor ref from the event stream
* @param subscriber
* @tparam E
*/
final case class Unsubscribe[E](subscriber: ActorRef[E]) extends Command
object Unsubscribe {
/**
* Java API.
*/
def of[E](subscriber: ActorRef[E]): Unsubscribe[E] = apply(subscriber)
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal
import akka.actor.typed.eventstream.Command
import akka.actor.typed.internal.adapter.EventStreamAdapter
import akka.actor.typed._
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
/**
* INTERNAL API
*
* Exposes a typed actor that interacts with the [[akka.actor.ActorSystem.eventStream]].
*
* It is used as an extension to ensure a single instance per actor system.
*/
@InternalApi private[akka] final class EventStreamExtension(actorSystem: ActorSystem[_]) extends Extension {
val ref: ActorRef[Command] =
actorSystem.internalSystemActorOf(EventStreamAdapter.behavior, "eventstream", Props.empty)
}
private[akka] object EventStreamExtension extends ExtensionId[EventStreamExtension] {
override def createExtension(system: ActorSystem[_]): EventStreamExtension = new EventStreamExtension(system)
}

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.adapter
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
/**
* INTERNAL API
* Encapsulates the [[akka.actor.ActorSystem.eventStream]] in a [[Behavior]]
*/
@InternalApi private[akka] object EventStreamAdapter {
private[akka] val behavior: Behavior[Command] =
Behaviors.setup { ctx =>
val eventStream = ctx.system.toUntyped.eventStream
eventStreamBehavior(eventStream)
}
private def eventStreamBehavior(eventStream: akka.event.EventStream): Behavior[Command] =
Behaviors.receiveMessage {
case Publish(event) =>
eventStream.publish(event)
Behaviors.same
case s @ Subscribe(subscriber) =>
eventStream.subscribe(subscriber.toUntyped, s.topic)
Behaviors.same
case Unsubscribe(subscriber) =>
eventStream.unsubscribe(subscriber.toUntyped)
Behaviors.same
}
}