+act add getEventStream to ActorSystem #25870

This commit is contained in:
kerr 2018-11-09 19:29:05 +08:00 committed by Johan Andrén
parent f66ee1cbe8
commit e847ce016a
6 changed files with 26 additions and 21 deletions

View file

@ -50,7 +50,7 @@ public class LoggingAdapterTest extends JUnitSuite {
public void mustFormatMessage() {
final LoggingAdapter log = Logging.getLogger(system, this);
new LogJavaTestKit(system) {{
system.eventStream().subscribe(getRef(), LogEvent.class);
system.getEventStream().subscribe(getRef(), LogEvent.class);
log.error("One arg message: {}", "the arg");
expectLog(ErrorLevel(), "One arg message: the arg");
@ -82,7 +82,7 @@ public class LoggingAdapterTest extends JUnitSuite {
@Test
public void mustCallMdcForEveryLog() throws Exception {
new LogJavaTestKit(system) {{
system.eventStream().subscribe(getRef(), LogEvent.class);
system.getEventStream().subscribe(getRef(), LogEvent.class);
ActorRef ref = system.actorOf(Props.create(ActorWithMDC.class));
ref.tell(new Log(ErrorLevel(), "An Error"), system.deadLetters());
@ -101,7 +101,7 @@ public class LoggingAdapterTest extends JUnitSuite {
@Test
public void mustSupportMdcNull() throws Exception {
new LogJavaTestKit(system) {{
system.eventStream().subscribe(getRef(), LogEvent.class);
system.getEventStream().subscribe(getRef(), LogEvent.class);
ActorRef ref = system.actorOf(Props.create(ActorWithMDC.class));
ref.tell(new Log(InfoLevel(), "Null MDC"), system.deadLetters());

View file

@ -307,16 +307,16 @@ public class AdapterTest extends JUnitSuite {
akka.actor.ActorRef ignore = system.actorOf(akka.actor.Props.empty());
ActorRef<String> typedRef = Adapter.spawnAnonymous(system, Typed1.create(ignore, probe.getRef()));
int originalLogLevel = system.eventStream().logLevel();
int originalLogLevel = system.getEventStream().logLevel();
try {
// suppress the logging with stack trace
system.eventStream().setLogLevel(Integer.MIN_VALUE); // OFF
system.getEventStream().setLogLevel(Integer.MIN_VALUE); // OFF
// only stop supervisorStrategy
typedRef.tell("supervise-stop");
probe.expectMsg("terminated");
} finally {
system.eventStream().setLogLevel(originalLogLevel);
system.getEventStream().setLogLevel(originalLogLevel);
}
probe.expectNoMessage(Duration.ofMillis(100)); // no pong
}

View file

@ -467,6 +467,11 @@ abstract class ActorSystem extends ActorRefFactory {
*/
def eventStream: EventStream
/**
* Java API: Main event bus of this actor system, used for example for logging.
*/
def getEventStream: EventStream = eventStream
/**
* Convenient logging adapter for logging to the [[ActorSystem#eventStream]].
*/

View file

@ -159,7 +159,7 @@ public class FaultHandlingTest extends AbstractJavaTest {
EventFilter ex3 = new ErrorFilter(IllegalArgumentException.class);
EventFilter ex4 = new ErrorFilter(Exception.class);
EventFilter[] ignoreExceptions = { ex1, ex2, ex3, ex4 };
system.eventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
system.getEventStream().publish(new TestEvent.Mute(immutableSeq(ignoreExceptions)));
//#create
Props superprops = Props.create(Supervisor.class);

View file

@ -59,7 +59,7 @@ public class LoggingDocTest extends AbstractJavaTest {
//#deadletters
final ActorSystem system = ActorSystem.create("DeadLetters");
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
system.eventStream().subscribe(actor, DeadLetter.class);
system.getEventStream().subscribe(actor, DeadLetter.class);
//#deadletters
TestKit.shutdownActorSystem(system);
}
@ -100,18 +100,18 @@ public class LoggingDocTest extends AbstractJavaTest {
final ActorSystem system = ActorSystem.create("DeadLetters");
//#superclass-subscription-eventstream
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
system.eventStream().subscribe(actor, DeadLetter.class);
system.getEventStream().subscribe(actor, DeadLetter.class);
final ActorRef jazzListener = system.actorOf(Props.create(Listener.class));
final ActorRef musicListener = system.actorOf(Props.create(Listener.class));
system.eventStream().subscribe(jazzListener, Jazz.class);
system.eventStream().subscribe(musicListener, AllKindsOfMusic.class);
system.getEventStream().subscribe(jazzListener, Jazz.class);
system.getEventStream().subscribe(musicListener, AllKindsOfMusic.class);
// only musicListener gets this message, since it listens to *all* kinds of music:
system.eventStream().publish(new Electronic("Parov Stelar"));
system.getEventStream().publish(new Electronic("Parov Stelar"));
// jazzListener and musicListener will be notified about Jazz:
system.eventStream().publish(new Jazz("Sonny Rollins"));
system.getEventStream().publish(new Jazz("Sonny Rollins"));
//#superclass-subscription-eventstream
TestKit.shutdownActorSystem(system);
@ -123,7 +123,7 @@ public class LoggingDocTest extends AbstractJavaTest {
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
//#suppressed-deadletters
system.eventStream().subscribe(actor, SuppressedDeadLetter.class);
system.getEventStream().subscribe(actor, SuppressedDeadLetter.class);
//#suppressed-deadletters
TestKit.shutdownActorSystem(system);
@ -134,7 +134,7 @@ public class LoggingDocTest extends AbstractJavaTest {
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
//#all-deadletters
system.eventStream().subscribe(actor, AllDeadLetters.class);
system.getEventStream().subscribe(actor, AllDeadLetters.class);
//#all-deadletters
TestKit.shutdownActorSystem(system);

View file

@ -99,7 +99,7 @@ class ExamplePersistentActor extends AbstractPersistentActor {
final Evt evt = new Evt(data + "-" + getNumEvents());
persist(evt, (Evt e) -> {
state.update(e);
getContext().getSystem().eventStream().publish(e);
getContext().getSystem().getEventStream().publish(e);
if (lastSequenceNr() % snapShotInterval == 0 && lastSequenceNr() != 0)
// IMPORTANT: create a copy of snapshot because ExampleState is mutable
saveSnapshot(state.copy());