* Remove ActorContext parameter from javadsl.ReceiveBuilder, #27120 * functional style in javadsl * in Java it's more practical to have an enclosing class to hold initialization parameters and ActorContext * writing behaviors as pure static methods will be unlikely be used in Java * it's still possible to write behaviors as static methods by passing the context around, in same way as all other things * better to embrace the enclosing class pattern and therefore remove the context parameter from the message handlers * style cleanup of ChatRoom sample * migration guide
This commit is contained in:
parent
f004a574b4
commit
893bd8b74b
25 changed files with 616 additions and 561 deletions
|
|
@ -130,95 +130,98 @@ public class BehaviorTestKitTest extends JUnitSuite {
|
||||||
private static Props props = Props.empty().withDispatcherFromConfig("cat");
|
private static Props props = Props.empty().withDispatcherFromConfig("cat");
|
||||||
|
|
||||||
private static Behavior<Command> behavior =
|
private static Behavior<Command> behavior =
|
||||||
Behaviors.receive(Command.class)
|
Behaviors.setup(
|
||||||
.onMessage(
|
context -> {
|
||||||
SpawnChildren.class,
|
return Behaviors.receive(Command.class)
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
IntStream.range(0, message.numberOfChildren)
|
SpawnChildren.class,
|
||||||
.forEach(
|
message -> {
|
||||||
i -> {
|
IntStream.range(0, message.numberOfChildren)
|
||||||
context.spawn(childInitial, "child" + i);
|
.forEach(
|
||||||
});
|
i -> {
|
||||||
return Behaviors.same();
|
context.spawn(childInitial, "child" + i);
|
||||||
})
|
});
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SpawnChildrenAnonymous.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
IntStream.range(0, message.numberOfChildren)
|
SpawnChildrenAnonymous.class,
|
||||||
.forEach(
|
message -> {
|
||||||
i -> {
|
IntStream.range(0, message.numberOfChildren)
|
||||||
context.spawnAnonymous(childInitial);
|
.forEach(
|
||||||
});
|
i -> {
|
||||||
return Behaviors.same();
|
context.spawnAnonymous(childInitial);
|
||||||
})
|
});
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SpawnChildrenWithProps.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
IntStream.range(0, message.numberOfChildren)
|
SpawnChildrenWithProps.class,
|
||||||
.forEach(
|
message -> {
|
||||||
i -> {
|
IntStream.range(0, message.numberOfChildren)
|
||||||
context.spawn(childInitial, "child" + i, message.props);
|
.forEach(
|
||||||
});
|
i -> {
|
||||||
return Behaviors.same();
|
context.spawn(childInitial, "child" + i, message.props);
|
||||||
})
|
});
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SpawnChildrenAnonymousWithProps.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
IntStream.range(0, message.numberOfChildren)
|
SpawnChildrenAnonymousWithProps.class,
|
||||||
.forEach(
|
message -> {
|
||||||
i -> {
|
IntStream.range(0, message.numberOfChildren)
|
||||||
context.spawnAnonymous(childInitial, message.props);
|
.forEach(
|
||||||
});
|
i -> {
|
||||||
return Behaviors.same();
|
context.spawnAnonymous(childInitial, message.props);
|
||||||
})
|
});
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
CreateMessageAdapter.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.messageAdapter(message.clazz, message.f);
|
CreateMessageAdapter.class,
|
||||||
return Behaviors.same();
|
message -> {
|
||||||
})
|
context.messageAdapter(message.clazz, message.f);
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SpawnWatchAndUnWatch.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
ActorRef<Action> c = context.spawn(childInitial, message.name);
|
SpawnWatchAndUnWatch.class,
|
||||||
context.watch(c);
|
message -> {
|
||||||
context.unwatch(c);
|
ActorRef<Action> c = context.spawn(childInitial, message.name);
|
||||||
return Behaviors.same();
|
context.watch(c);
|
||||||
})
|
context.unwatch(c);
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SpawnAndWatchWith.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
ActorRef<Action> c = context.spawn(childInitial, message.name);
|
SpawnAndWatchWith.class,
|
||||||
context.watchWith(c, message);
|
message -> {
|
||||||
return Behaviors.same();
|
ActorRef<Action> c = context.spawn(childInitial, message.name);
|
||||||
})
|
context.watchWith(c, message);
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SpawnSession.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
ActorRef<String> session =
|
SpawnSession.class,
|
||||||
context.spawnAnonymous(
|
message -> {
|
||||||
Behaviors.receiveMessage(
|
ActorRef<String> session =
|
||||||
m -> {
|
context.spawnAnonymous(
|
||||||
message.sessionHandler.tell(m);
|
Behaviors.receiveMessage(
|
||||||
return Behaviors.same();
|
m -> {
|
||||||
}));
|
message.sessionHandler.tell(m);
|
||||||
message.replyTo.tell(session);
|
return Behaviors.same();
|
||||||
return Behaviors.same();
|
}));
|
||||||
})
|
message.replyTo.tell(session);
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
KillSession.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.stop(message.session);
|
KillSession.class,
|
||||||
message.replyTo.tell(Done.getInstance());
|
message -> {
|
||||||
return Behaviors.same();
|
context.stop(message.session);
|
||||||
})
|
message.replyTo.tell(Done.getInstance());
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
Log.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.getLog().info(message.what);
|
Log.class,
|
||||||
return Behaviors.same();
|
message -> {
|
||||||
})
|
context.getLog().info(message.what);
|
||||||
.build();
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
});
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void allowAssertionsOnEffectType() {
|
public void allowAssertionsOnEffectType() {
|
||||||
|
|
|
||||||
|
|
@ -67,47 +67,49 @@ public class SyncTestingExampleTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Behavior<Command> myBehavior =
|
public static Behavior<Command> myBehavior =
|
||||||
Behaviors.receive(Command.class)
|
Behaviors.setup(
|
||||||
.onMessage(
|
context ->
|
||||||
CreateAChild.class,
|
Behaviors.receive(Command.class)
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.spawn(childActor, message.childName);
|
CreateAChild.class,
|
||||||
return Behaviors.same();
|
message -> {
|
||||||
})
|
context.spawn(childActor, message.childName);
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
CreateAnAnonymousChild.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.spawnAnonymous(childActor);
|
CreateAnAnonymousChild.class,
|
||||||
return Behaviors.same();
|
message -> {
|
||||||
})
|
context.spawnAnonymous(childActor);
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SayHelloToChild.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
ActorRef<String> child = context.spawn(childActor, message.childName);
|
SayHelloToChild.class,
|
||||||
child.tell("hello");
|
message -> {
|
||||||
return Behaviors.same();
|
ActorRef<String> child = context.spawn(childActor, message.childName);
|
||||||
})
|
child.tell("hello");
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SayHelloToAnonymousChild.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
ActorRef<String> child = context.spawnAnonymous(childActor);
|
SayHelloToAnonymousChild.class,
|
||||||
child.tell("hello stranger");
|
message -> {
|
||||||
return Behaviors.same();
|
ActorRef<String> child = context.spawnAnonymous(childActor);
|
||||||
})
|
child.tell("hello stranger");
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
SayHello.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
message.who.tell("hello");
|
SayHello.class,
|
||||||
return Behaviors.same();
|
message -> {
|
||||||
})
|
message.who.tell("hello");
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
LogAndSayHello.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.getLog().info("Saying hello to {}", message.who.path().name());
|
LogAndSayHello.class,
|
||||||
message.who.tell("hello");
|
message -> {
|
||||||
return Behaviors.same();
|
context.getLog().info("Saying hello to {}", message.who.path().name());
|
||||||
})
|
message.who.tell("hello");
|
||||||
.build();
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.build());
|
||||||
// #under-test
|
// #under-test
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -69,21 +69,23 @@ public class ActorLoggingTest extends JUnitSuite {
|
||||||
@Test
|
@Test
|
||||||
public void loggingProvidesMDC() {
|
public void loggingProvidesMDC() {
|
||||||
Behavior<Protocol> behavior =
|
Behavior<Protocol> behavior =
|
||||||
Behaviors.withMdc(
|
Behaviors.setup(
|
||||||
null,
|
context ->
|
||||||
(message) -> {
|
Behaviors.withMdc(
|
||||||
Map<String, Object> mdc = new HashMap<>();
|
null,
|
||||||
mdc.put("txId", message.getTransactionId());
|
(message) -> {
|
||||||
return mdc;
|
Map<String, Object> mdc = new HashMap<>();
|
||||||
},
|
mdc.put("txId", message.getTransactionId());
|
||||||
Behaviors.receive(Protocol.class)
|
return mdc;
|
||||||
.onMessage(
|
},
|
||||||
Message.class,
|
Behaviors.receive(Protocol.class)
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.getLog().info(message.toString());
|
Message.class,
|
||||||
return Behaviors.same();
|
message -> {
|
||||||
})
|
context.getLog().info(message.toString());
|
||||||
.build());
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.build()));
|
||||||
|
|
||||||
CustomEventFilter eventFilter =
|
CustomEventFilter eventFilter =
|
||||||
new CustomEventFilter(
|
new CustomEventFilter(
|
||||||
|
|
|
||||||
|
|
@ -40,20 +40,20 @@ public class BehaviorBuilderTest extends JUnitSuite {
|
||||||
Behaviors.receive(Message.class)
|
Behaviors.receive(Message.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
One.class,
|
One.class,
|
||||||
(context, o) -> {
|
o -> {
|
||||||
o.foo();
|
o.foo();
|
||||||
return same();
|
return same();
|
||||||
})
|
})
|
||||||
.onMessage(One.class, o -> o.foo().startsWith("a"), (context, o) -> same())
|
.onMessage(One.class, o -> o.foo().startsWith("a"), o -> same())
|
||||||
.onMessageUnchecked(
|
.onMessageUnchecked(
|
||||||
MyList.class,
|
MyList.class,
|
||||||
(ActorContext<Message> context, MyList<String> l) -> {
|
(MyList<String> l) -> {
|
||||||
String first = l.get(0);
|
String first = l.get(0);
|
||||||
return Behaviors.<Message>same();
|
return Behaviors.<Message>same();
|
||||||
})
|
})
|
||||||
.onSignal(
|
.onSignal(
|
||||||
Terminated.class,
|
Terminated.class,
|
||||||
(context, t) -> {
|
t -> {
|
||||||
System.out.println("Terminating along with " + t.getRef());
|
System.out.println("Terminating along with " + t.getRef());
|
||||||
return stopped();
|
return stopped();
|
||||||
})
|
})
|
||||||
|
|
@ -67,13 +67,13 @@ public class BehaviorBuilderTest extends JUnitSuite {
|
||||||
BehaviorBuilder.create()
|
BehaviorBuilder.create()
|
||||||
.onMessage(
|
.onMessage(
|
||||||
String.class,
|
String.class,
|
||||||
(context, msg) -> {
|
msg -> {
|
||||||
probe.ref().tell("handler 1: " + msg);
|
probe.ref().tell("handler 1: " + msg);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
String.class,
|
String.class,
|
||||||
(context, msg) -> {
|
msg -> {
|
||||||
probe.ref().tell("handler 2: " + msg);
|
probe.ref().tell("handler 2: " + msg);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
@ -90,7 +90,7 @@ public class BehaviorBuilderTest extends JUnitSuite {
|
||||||
BehaviorBuilder.create()
|
BehaviorBuilder.create()
|
||||||
.onMessageEquals(
|
.onMessageEquals(
|
||||||
"message",
|
"message",
|
||||||
(context) -> {
|
() -> {
|
||||||
probe.ref().tell("got it");
|
probe.ref().tell("got it");
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
@ -107,14 +107,14 @@ public class BehaviorBuilderTest extends JUnitSuite {
|
||||||
BehaviorBuilder.create()
|
BehaviorBuilder.create()
|
||||||
.onMessage(
|
.onMessage(
|
||||||
String.class,
|
String.class,
|
||||||
(msg) -> "other".equals(msg),
|
"other"::equals,
|
||||||
(context, msg) -> {
|
msg -> {
|
||||||
probe.ref().tell("handler 1: " + msg);
|
probe.ref().tell("handler 1: " + msg);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
String.class,
|
String.class,
|
||||||
(context, msg) -> {
|
msg -> {
|
||||||
probe.ref().tell("handler 2: " + msg);
|
probe.ref().tell("handler 2: " + msg);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
@ -130,7 +130,7 @@ public class BehaviorBuilderTest extends JUnitSuite {
|
||||||
Behavior<Object> behavior =
|
Behavior<Object> behavior =
|
||||||
BehaviorBuilder.create()
|
BehaviorBuilder.create()
|
||||||
.onAnyMessage(
|
.onAnyMessage(
|
||||||
(context, msg) -> {
|
msg -> {
|
||||||
probe.ref().tell(msg);
|
probe.ref().tell(msg);
|
||||||
return same();
|
return same();
|
||||||
})
|
})
|
||||||
|
|
@ -164,12 +164,12 @@ public class BehaviorBuilderTest extends JUnitSuite {
|
||||||
return Behaviors.receive(CounterMessage.class)
|
return Behaviors.receive(CounterMessage.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Increase.class,
|
Increase.class,
|
||||||
(context, o) -> {
|
o -> {
|
||||||
return immutableCounter(currentValue + 1);
|
return immutableCounter(currentValue + 1);
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Get.class,
|
Get.class,
|
||||||
(context, o) -> {
|
o -> {
|
||||||
o.sender.tell(new Got(currentValue));
|
o.sender.tell(new Got(currentValue));
|
||||||
return same();
|
return same();
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -73,16 +73,18 @@ public class WatchTest extends JUnitSuite {
|
||||||
@Test
|
@Test
|
||||||
public void shouldWatchTerminatingActor() throws Exception {
|
public void shouldWatchTerminatingActor() throws Exception {
|
||||||
Behavior<RunTest> exiting =
|
Behavior<RunTest> exiting =
|
||||||
Behaviors.receive(RunTest.class)
|
Behaviors.setup(
|
||||||
.onMessage(
|
context ->
|
||||||
RunTest.class,
|
Behaviors.receive(RunTest.class)
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
ActorRef<Stop> watched = context.spawn(exitingActor, "exitingActor");
|
RunTest.class,
|
||||||
context.watch(watched);
|
message -> {
|
||||||
watched.tell(new Stop());
|
ActorRef<Stop> watched = context.spawn(exitingActor, "exitingActor");
|
||||||
return waitingForTermination(message.replyTo);
|
context.watch(watched);
|
||||||
})
|
watched.tell(new Stop());
|
||||||
.build();
|
return waitingForTermination(message.replyTo);
|
||||||
|
})
|
||||||
|
.build());
|
||||||
ActorRef<RunTest> exitingRef = testKit.spawn(exiting);
|
ActorRef<RunTest> exitingRef = testKit.spawn(exiting);
|
||||||
|
|
||||||
CompletionStage<Done> result =
|
CompletionStage<Done> result =
|
||||||
|
|
@ -93,16 +95,18 @@ public class WatchTest extends JUnitSuite {
|
||||||
@Test
|
@Test
|
||||||
public void shouldWatchWithCustomMessage() throws Exception {
|
public void shouldWatchWithCustomMessage() throws Exception {
|
||||||
Behavior<Message> exiting =
|
Behavior<Message> exiting =
|
||||||
Behaviors.receive(Message.class)
|
Behaviors.setup(
|
||||||
.onMessage(
|
context ->
|
||||||
RunTest.class,
|
Behaviors.receive(Message.class)
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
ActorRef<Stop> watched = context.spawn(exitingActor, "exitingActor");
|
RunTest.class,
|
||||||
context.watchWith(watched, new CustomTerminationMessage());
|
message -> {
|
||||||
watched.tell(new Stop());
|
ActorRef<Stop> watched = context.spawn(exitingActor, "exitingActor");
|
||||||
return waitingForMessage(message.replyTo);
|
context.watchWith(watched, new CustomTerminationMessage());
|
||||||
})
|
watched.tell(new Stop());
|
||||||
.build();
|
return waitingForMessage(message.replyTo);
|
||||||
|
})
|
||||||
|
.build());
|
||||||
ActorRef<Message> exitingRef = testKit.spawn(exiting);
|
ActorRef<Message> exitingRef = testKit.spawn(exiting);
|
||||||
|
|
||||||
// Not sure why this does not compile without an explicit cast?
|
// Not sure why this does not compile without an explicit cast?
|
||||||
|
|
|
||||||
|
|
@ -97,14 +97,14 @@ public class ReceptionistApiTest {
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Receptionist.Listing.class,
|
Receptionist.Listing.class,
|
||||||
listing -> listing.isForKey(key),
|
listing -> listing.isForKey(key),
|
||||||
(msgCtx, listing) -> {
|
listing -> {
|
||||||
Set<ActorRef<String>> services = listing.getServiceInstances(key);
|
Set<ActorRef<String>> services = listing.getServiceInstances(key);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Receptionist.Registered.class,
|
Receptionist.Registered.class,
|
||||||
registered -> registered.isForKey(key),
|
registered -> registered.isForKey(key),
|
||||||
(msgCtx, registered) -> {
|
registered -> {
|
||||||
ActorRef<String> registree = registered.getServiceInstance(key);
|
ActorRef<String> registree = registered.getServiceInstance(key);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ public class BubblingSample {
|
||||||
Behaviors.receive(Message.class)
|
Behaviors.receive(Message.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Fail.class,
|
Fail.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
throw new RuntimeException(message.text);
|
throw new RuntimeException(message.text);
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -45,7 +45,7 @@ public class BubblingSample {
|
||||||
return Behaviors.receive(Message.class)
|
return Behaviors.receive(Message.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Message.class,
|
Message.class,
|
||||||
(innerCtx, message) -> {
|
message -> {
|
||||||
// just pass messages on to the child
|
// just pass messages on to the child
|
||||||
child.tell(message);
|
child.tell(message);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
|
|
@ -67,7 +67,7 @@ public class BubblingSample {
|
||||||
return Behaviors.receive(Message.class)
|
return Behaviors.receive(Message.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Message.class,
|
Message.class,
|
||||||
(innerCtx, message) -> {
|
message -> {
|
||||||
// just pass messages on to the child
|
// just pass messages on to the child
|
||||||
middleManagement.tell(message);
|
middleManagement.tell(message);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
|
|
|
||||||
|
|
@ -158,14 +158,13 @@ public class FSMDocTest {
|
||||||
private static Behavior<Event> uninitialized() {
|
private static Behavior<Event> uninitialized() {
|
||||||
return Behaviors.receive(Event.class)
|
return Behaviors.receive(Event.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
SetTarget.class,
|
SetTarget.class, message -> idle(new Todo(message.getRef(), Collections.emptyList())))
|
||||||
(context, message) -> idle(new Todo(message.getRef(), Collections.emptyList())))
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Behavior<Event> idle(Todo data) {
|
private static Behavior<Event> idle(Todo data) {
|
||||||
return Behaviors.receive(Event.class)
|
return Behaviors.receive(Event.class)
|
||||||
.onMessage(Queue.class, (context, message) -> active(data.addElement(message)))
|
.onMessage(Queue.class, message -> active(data.addElement(message)))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,16 +174,16 @@ public class FSMDocTest {
|
||||||
// State timeouts done with withTimers
|
// State timeouts done with withTimers
|
||||||
timers.startSingleTimer("Timeout", TIMEOUT, Duration.ofSeconds(1));
|
timers.startSingleTimer("Timeout", TIMEOUT, Duration.ofSeconds(1));
|
||||||
return Behaviors.receive(Event.class)
|
return Behaviors.receive(Event.class)
|
||||||
.onMessage(Queue.class, (context, message) -> active(data.addElement(message)))
|
.onMessage(Queue.class, message -> active(data.addElement(message)))
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Flush.class,
|
Flush.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
data.getTarget().tell(new Batch(data.queue));
|
data.getTarget().tell(new Batch(data.queue));
|
||||||
return idle(data.copy(new ArrayList<>()));
|
return idle(data.copy(new ArrayList<>()));
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Timeout.class,
|
Timeout.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
data.getTarget().tell(new Batch(data.queue));
|
data.getTarget().tell(new Batch(data.queue));
|
||||||
return idle(data.copy(new ArrayList<>()));
|
return idle(data.copy(new ArrayList<>()));
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -40,39 +40,42 @@ public class GracefulStopDocTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final Behavior<JobControlLanguage> mcpa =
|
public static final Behavior<JobControlLanguage> mcpa =
|
||||||
Behaviors.receive(JobControlLanguage.class)
|
Behaviors.setup(
|
||||||
.onMessage(
|
context ->
|
||||||
SpawnJob.class,
|
Behaviors.receive(JobControlLanguage.class)
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.getSystem().log().info("Spawning job {}!", message.name);
|
SpawnJob.class,
|
||||||
context.spawn(Job.job(message.name), message.name);
|
message -> {
|
||||||
return Behaviors.same();
|
context.getSystem().log().info("Spawning job {}!", message.name);
|
||||||
})
|
context.spawn(Job.job(message.name), message.name);
|
||||||
.onSignal(
|
return Behaviors.same();
|
||||||
PostStop.class,
|
})
|
||||||
(context, signal) -> {
|
.onSignal(
|
||||||
context.getSystem().log().info("Master Control Programme stopped");
|
PostStop.class,
|
||||||
return Behaviors.same();
|
signal -> {
|
||||||
})
|
context.getSystem().log().info("Master Control Programme stopped");
|
||||||
.onMessage(
|
return Behaviors.same();
|
||||||
GracefulShutdown.class,
|
})
|
||||||
(context, message) -> {
|
.onMessage(
|
||||||
context.getSystem().log().info("Initiating graceful shutdown...");
|
GracefulShutdown.class,
|
||||||
|
message -> {
|
||||||
|
context.getSystem().log().info("Initiating graceful shutdown...");
|
||||||
|
|
||||||
// perform graceful stop, executing cleanup before final system termination
|
// perform graceful stop, executing cleanup before final system
|
||||||
// behavior executing cleanup is passed as a parameter to Actor.stopped
|
// termination
|
||||||
return Behaviors.stopped(
|
// behavior executing cleanup is passed as a parameter to Actor.stopped
|
||||||
() -> {
|
return Behaviors.stopped(
|
||||||
context.getSystem().log().info("Cleanup!");
|
() -> {
|
||||||
});
|
context.getSystem().log().info("Cleanup!");
|
||||||
})
|
});
|
||||||
.onSignal(
|
})
|
||||||
PostStop.class,
|
.onSignal(
|
||||||
(context, signal) -> {
|
PostStop.class,
|
||||||
context.getSystem().log().info("Master Control Programme stopped");
|
signal -> {
|
||||||
return Behaviors.same();
|
context.getSystem().log().info("Master Control Programme stopped");
|
||||||
})
|
return Behaviors.same();
|
||||||
.build();
|
})
|
||||||
|
.build());
|
||||||
}
|
}
|
||||||
// #master-actor
|
// #master-actor
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,14 +33,16 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
static final Behavior<PrintMe> printerBehavior =
|
static final Behavior<PrintMe> printerBehavior =
|
||||||
Behaviors.receive(PrintMe.class)
|
Behaviors.setup(
|
||||||
.onMessage(
|
context ->
|
||||||
PrintMe.class,
|
Behaviors.receive(PrintMe.class)
|
||||||
(context, printMe) -> {
|
.onMessage(
|
||||||
context.getLog().info(printMe.message);
|
PrintMe.class,
|
||||||
return Behaviors.same();
|
printMe -> {
|
||||||
})
|
context.getLog().info(printMe.message);
|
||||||
.build();
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.build());
|
||||||
// #fire-and-forget-definition
|
// #fire-and-forget-definition
|
||||||
|
|
||||||
// #request-response-protocol
|
// #request-response-protocol
|
||||||
|
|
@ -70,7 +72,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
Behaviors.receive(Request.class)
|
Behaviors.receive(Request.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Request.class,
|
Request.class,
|
||||||
(context, request) -> {
|
request -> {
|
||||||
// ... process request ...
|
// ... process request ...
|
||||||
request.respondTo.tell(new Response("Here's your response!"));
|
request.respondTo.tell(new Response("Here's your response!"));
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
|
|
@ -314,7 +316,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
return Behaviors.receive(Msg.class)
|
return Behaviors.receive(Msg.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Msg.class,
|
Msg.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after);
|
timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after);
|
||||||
List<Msg> buffer = new ArrayList<>();
|
List<Msg> buffer = new ArrayList<>();
|
||||||
buffer.add(message);
|
buffer.add(message);
|
||||||
|
|
@ -332,13 +334,13 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
return Behaviors.receive(Msg.class)
|
return Behaviors.receive(Msg.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
TimeoutMsg.class,
|
TimeoutMsg.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
target.tell(new Batch(buffer));
|
target.tell(new Batch(buffer));
|
||||||
return idle(timers, target, after, maxSize);
|
return idle(timers, target, after, maxSize);
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Msg.class,
|
Msg.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
buffer.add(message);
|
buffer.add(message);
|
||||||
if (buffer.size() == maxSize) {
|
if (buffer.size() == maxSize) {
|
||||||
timers.cancel(TIMER_KEY);
|
timers.cancel(TIMER_KEY);
|
||||||
|
|
@ -399,7 +401,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
Behaviors.receive(HalCommand.class)
|
Behaviors.receive(HalCommand.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
OpenThePodBayDoorsPlease.class,
|
OpenThePodBayDoorsPlease.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
message.respondTo.tell(
|
message.respondTo.tell(
|
||||||
new HalResponse("I'm sorry, Dave. I'm afraid I can't do that."));
|
new HalResponse("I'm sorry, Dave. I'm afraid I can't do that."));
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
|
|
@ -465,8 +467,8 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
// message sent to the actor
|
// message sent to the actor
|
||||||
.onMessage(
|
.onMessage(
|
||||||
AdaptedResponse.class,
|
AdaptedResponse.class,
|
||||||
(innerCtx, response) -> {
|
response -> {
|
||||||
innerCtx.getLog().info("Got response from HAL: {}", response.message);
|
context.getLog().info("Got response from HAL: {}", response.message);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -571,7 +573,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
||||||
return Behaviors.receive(HomeCommand.class)
|
return Behaviors.receive(HomeCommand.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
LeaveHome.class,
|
LeaveHome.class,
|
||||||
(innerCtx, message) -> {
|
message -> {
|
||||||
context.spawn(
|
context.spawn(
|
||||||
new PrepareToLeaveHome(message.who, message.respondTo, keyCabinet, drawer),
|
new PrepareToLeaveHome(message.who, message.respondTo, keyCabinet, drawer),
|
||||||
"leaving" + message.who);
|
"leaving" + message.who);
|
||||||
|
|
|
||||||
|
|
@ -12,10 +12,12 @@ import akka.actor.typed.Behavior;
|
||||||
import akka.actor.typed.Terminated;
|
import akka.actor.typed.Terminated;
|
||||||
import akka.actor.typed.Props;
|
import akka.actor.typed.Props;
|
||||||
import akka.actor.typed.DispatcherSelector;
|
import akka.actor.typed.DispatcherSelector;
|
||||||
|
import akka.actor.typed.javadsl.ActorContext;
|
||||||
import akka.actor.typed.javadsl.Behaviors;
|
import akka.actor.typed.javadsl.Behaviors;
|
||||||
|
|
||||||
// #imports
|
// #imports
|
||||||
|
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
@ -153,8 +155,10 @@ public class IntroTest {
|
||||||
system.terminate();
|
system.terminate();
|
||||||
}
|
}
|
||||||
|
|
||||||
// #chatroom-actor
|
// #chatroom-behavior
|
||||||
public static class ChatRoom {
|
public static class ChatRoom {
|
||||||
|
// #chatroom-behavior
|
||||||
|
|
||||||
// #chatroom-protocol
|
// #chatroom-protocol
|
||||||
static interface RoomCommand {}
|
static interface RoomCommand {}
|
||||||
|
|
||||||
|
|
@ -229,90 +233,109 @@ public class IntroTest {
|
||||||
// #chatroom-protocol
|
// #chatroom-protocol
|
||||||
// #chatroom-behavior
|
// #chatroom-behavior
|
||||||
|
|
||||||
public static Behavior<RoomCommand> behavior() {
|
public static Behavior<RoomCommand> create() {
|
||||||
return chatRoom(new ArrayList<ActorRef<SessionCommand>>());
|
return Behaviors.setup(
|
||||||
|
ctx -> new ChatRoom(ctx).chatRoom(new ArrayList<ActorRef<SessionCommand>>()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Behavior<RoomCommand> chatRoom(List<ActorRef<SessionCommand>> sessions) {
|
private final ActorContext<RoomCommand> context;
|
||||||
|
|
||||||
|
private ChatRoom(ActorContext<RoomCommand> context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<RoomCommand> chatRoom(List<ActorRef<SessionCommand>> sessions) {
|
||||||
return Behaviors.receive(RoomCommand.class)
|
return Behaviors.receive(RoomCommand.class)
|
||||||
.onMessage(
|
.onMessage(GetSession.class, getSession -> onGetSession(sessions, getSession))
|
||||||
GetSession.class,
|
.onMessage(PublishSessionMessage.class, pub -> onPublishSessionMessage(sessions, pub))
|
||||||
(context, getSession) -> {
|
|
||||||
ActorRef<SessionEvent> client = getSession.replyTo;
|
|
||||||
ActorRef<SessionCommand> ses =
|
|
||||||
context.spawn(
|
|
||||||
session(context.getSelf(), getSession.screenName, client),
|
|
||||||
URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name()));
|
|
||||||
// narrow to only expose PostMessage
|
|
||||||
client.tell(new SessionGranted(ses.narrow()));
|
|
||||||
List<ActorRef<SessionCommand>> newSessions = new ArrayList<>(sessions);
|
|
||||||
newSessions.add(ses);
|
|
||||||
return chatRoom(newSessions);
|
|
||||||
})
|
|
||||||
.onMessage(
|
|
||||||
PublishSessionMessage.class,
|
|
||||||
(context, pub) -> {
|
|
||||||
NotifyClient notification =
|
|
||||||
new NotifyClient((new MessagePosted(pub.screenName, pub.message)));
|
|
||||||
sessions.forEach(s -> s.tell(notification));
|
|
||||||
return Behaviors.same();
|
|
||||||
})
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Behavior<ChatRoom.SessionCommand> session(
|
private Behavior<RoomCommand> onGetSession(
|
||||||
ActorRef<RoomCommand> room, String screenName, ActorRef<SessionEvent> client) {
|
List<ActorRef<SessionCommand>> sessions, GetSession getSession)
|
||||||
return Behaviors.receive(ChatRoom.SessionCommand.class)
|
throws UnsupportedEncodingException {
|
||||||
.onMessage(
|
ActorRef<SessionEvent> client = getSession.replyTo;
|
||||||
PostMessage.class,
|
ActorRef<SessionCommand> ses =
|
||||||
(context, post) -> {
|
context.spawn(
|
||||||
// from client, publish to others via the room
|
Session.create(context.getSelf(), getSession.screenName, client),
|
||||||
room.tell(new PublishSessionMessage(screenName, post.message));
|
URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name()));
|
||||||
return Behaviors.same();
|
// narrow to only expose PostMessage
|
||||||
})
|
client.tell(new SessionGranted(ses.narrow()));
|
||||||
.onMessage(
|
List<ActorRef<SessionCommand>> newSessions = new ArrayList<>(sessions);
|
||||||
NotifyClient.class,
|
newSessions.add(ses);
|
||||||
(context, notification) -> {
|
return chatRoom(newSessions);
|
||||||
// published from the room
|
|
||||||
client.tell(notification.message);
|
|
||||||
return Behaviors.same();
|
|
||||||
})
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
// #chatroom-behavior
|
|
||||||
|
|
||||||
|
private Behavior<RoomCommand> onPublishSessionMessage(
|
||||||
|
List<ActorRef<SessionCommand>> sessions, PublishSessionMessage pub) {
|
||||||
|
NotifyClient notification =
|
||||||
|
new NotifyClient((new MessagePosted(pub.screenName, pub.message)));
|
||||||
|
sessions.forEach(s -> s.tell(notification));
|
||||||
|
return Behaviors.same();
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Session {
|
||||||
|
static Behavior<ChatRoom.SessionCommand> create(
|
||||||
|
ActorRef<RoomCommand> room, String screenName, ActorRef<SessionEvent> client) {
|
||||||
|
return Behaviors.receive(ChatRoom.SessionCommand.class)
|
||||||
|
.onMessage(PostMessage.class, post -> onPostMessage(room, screenName, post))
|
||||||
|
.onMessage(NotifyClient.class, notification -> onNotifyClient(client, notification))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Behavior<SessionCommand> onPostMessage(
|
||||||
|
ActorRef<RoomCommand> room, String screenName, PostMessage post) {
|
||||||
|
// from client, publish to others via the room
|
||||||
|
room.tell(new PublishSessionMessage(screenName, post.message));
|
||||||
|
return Behaviors.same();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Behavior<SessionCommand> onNotifyClient(
|
||||||
|
ActorRef<SessionEvent> client, NotifyClient notification) {
|
||||||
|
// published from the room
|
||||||
|
client.tell(notification.message);
|
||||||
|
return Behaviors.same();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// #chatroom-actor
|
// #chatroom-behavior
|
||||||
|
|
||||||
// #chatroom-gabbler
|
// #chatroom-gabbler
|
||||||
public abstract static class Gabbler {
|
public static class Gabbler {
|
||||||
private Gabbler() {}
|
public static Behavior<ChatRoom.SessionEvent> create() {
|
||||||
|
return Behaviors.setup(ctx -> new Gabbler(ctx).behavior());
|
||||||
|
}
|
||||||
|
|
||||||
public static Behavior<ChatRoom.SessionEvent> behavior() {
|
private final ActorContext<ChatRoom.SessionEvent> context;
|
||||||
|
|
||||||
|
private Gabbler(ActorContext<ChatRoom.SessionEvent> context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<ChatRoom.SessionEvent> behavior() {
|
||||||
return Behaviors.receive(ChatRoom.SessionEvent.class)
|
return Behaviors.receive(ChatRoom.SessionEvent.class)
|
||||||
.onMessage(
|
.onMessage(ChatRoom.SessionDenied.class, this::onSessionDenied)
|
||||||
ChatRoom.SessionDenied.class,
|
.onMessage(ChatRoom.SessionGranted.class, this::onSessionGranted)
|
||||||
(context, message) -> {
|
.onMessage(ChatRoom.MessagePosted.class, this::onMessagePosted)
|
||||||
context.getLog().info("cannot start chat room session: {}", message.reason);
|
|
||||||
return Behaviors.stopped();
|
|
||||||
})
|
|
||||||
.onMessage(
|
|
||||||
ChatRoom.SessionGranted.class,
|
|
||||||
(context, message) -> {
|
|
||||||
message.handle.tell(new ChatRoom.PostMessage("Hello World!"));
|
|
||||||
return Behaviors.same();
|
|
||||||
})
|
|
||||||
.onMessage(
|
|
||||||
ChatRoom.MessagePosted.class,
|
|
||||||
(context, message) -> {
|
|
||||||
context
|
|
||||||
.getLog()
|
|
||||||
.info(
|
|
||||||
"message has been posted by '{}': {}", message.screenName, message.message);
|
|
||||||
return Behaviors.stopped();
|
|
||||||
})
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Behavior<ChatRoom.SessionEvent> onSessionDenied(ChatRoom.SessionDenied message) {
|
||||||
|
context.getLog().info("cannot start chat room session: {}", message.reason);
|
||||||
|
return Behaviors.stopped();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<ChatRoom.SessionEvent> onSessionGranted(ChatRoom.SessionGranted message) {
|
||||||
|
message.handle.tell(new ChatRoom.PostMessage("Hello World!"));
|
||||||
|
return Behaviors.same();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<ChatRoom.SessionEvent> onMessagePosted(ChatRoom.MessagePosted message) {
|
||||||
|
context
|
||||||
|
.getLog()
|
||||||
|
.info("message has been posted by '{}': {}", message.screenName, message.message);
|
||||||
|
return Behaviors.stopped();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// #chatroom-gabbler
|
// #chatroom-gabbler
|
||||||
|
|
||||||
|
|
@ -323,9 +346,8 @@ public class IntroTest {
|
||||||
Behaviors.setup(
|
Behaviors.setup(
|
||||||
context -> {
|
context -> {
|
||||||
ActorRef<ChatRoom.RoomCommand> chatRoom =
|
ActorRef<ChatRoom.RoomCommand> chatRoom =
|
||||||
context.spawn(ChatRoom.behavior(), "chatRoom");
|
context.spawn(ChatRoom.create(), "chatRoom");
|
||||||
ActorRef<ChatRoom.SessionEvent> gabbler =
|
ActorRef<ChatRoom.SessionEvent> gabbler = context.spawn(Gabbler.create(), "gabbler");
|
||||||
context.spawn(Gabbler.behavior(), "gabbler");
|
|
||||||
context.watch(gabbler);
|
context.watch(gabbler);
|
||||||
chatRoom.tell(new ChatRoom.GetSession("ol’ Gabbler", gabbler));
|
chatRoom.tell(new ChatRoom.GetSession("ol’ Gabbler", gabbler));
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import akka.actor.typed.Behavior;
|
||||||
import akka.actor.typed.Terminated;
|
import akka.actor.typed.Terminated;
|
||||||
import akka.actor.typed.javadsl.*;
|
import akka.actor.typed.javadsl.*;
|
||||||
// #imports
|
// #imports
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
@ -18,8 +19,9 @@ import java.util.List;
|
||||||
|
|
||||||
public class OOIntroTest {
|
public class OOIntroTest {
|
||||||
|
|
||||||
// #chatroom-actor
|
// #chatroom-behavior
|
||||||
public static class ChatRoom {
|
public static class ChatRoom {
|
||||||
|
// #chatroom-behavior
|
||||||
// #chatroom-protocol
|
// #chatroom-protocol
|
||||||
static interface RoomCommand {}
|
static interface RoomCommand {}
|
||||||
|
|
||||||
|
|
@ -94,7 +96,7 @@ public class OOIntroTest {
|
||||||
// #chatroom-protocol
|
// #chatroom-protocol
|
||||||
// #chatroom-behavior
|
// #chatroom-behavior
|
||||||
|
|
||||||
public static Behavior<RoomCommand> behavior() {
|
public static Behavior<RoomCommand> create() {
|
||||||
return Behaviors.setup(ChatRoomBehavior::new);
|
return Behaviors.setup(ChatRoomBehavior::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -102,7 +104,7 @@ public class OOIntroTest {
|
||||||
final ActorContext<RoomCommand> context;
|
final ActorContext<RoomCommand> context;
|
||||||
final List<ActorRef<SessionCommand>> sessions = new ArrayList<>();
|
final List<ActorRef<SessionCommand>> sessions = new ArrayList<>();
|
||||||
|
|
||||||
public ChatRoomBehavior(ActorContext<RoomCommand> context) {
|
private ChatRoomBehavior(ActorContext<RoomCommand> context) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -110,40 +112,39 @@ public class OOIntroTest {
|
||||||
public Receive<RoomCommand> createReceive() {
|
public Receive<RoomCommand> createReceive() {
|
||||||
ReceiveBuilder<RoomCommand> builder = newReceiveBuilder();
|
ReceiveBuilder<RoomCommand> builder = newReceiveBuilder();
|
||||||
|
|
||||||
builder.onMessage(
|
builder.onMessage(GetSession.class, this::onGetSession);
|
||||||
GetSession.class,
|
builder.onMessage(PublishSessionMessage.class, this::onPublishSessionMessage);
|
||||||
getSession -> {
|
|
||||||
ActorRef<SessionEvent> client = getSession.replyTo;
|
|
||||||
ActorRef<SessionCommand> ses =
|
|
||||||
context.spawn(
|
|
||||||
new SessionBehavior(context.getSelf(), getSession.screenName, client),
|
|
||||||
URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name()));
|
|
||||||
// narrow to only expose PostMessage
|
|
||||||
client.tell(new SessionGranted(ses.narrow()));
|
|
||||||
sessions.add(ses);
|
|
||||||
return this;
|
|
||||||
});
|
|
||||||
|
|
||||||
builder.onMessage(
|
|
||||||
PublishSessionMessage.class,
|
|
||||||
pub -> {
|
|
||||||
NotifyClient notification =
|
|
||||||
new NotifyClient((new MessagePosted(pub.screenName, pub.message)));
|
|
||||||
sessions.forEach(s -> s.tell(notification));
|
|
||||||
return this;
|
|
||||||
});
|
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Behavior<RoomCommand> onGetSession(GetSession getSession)
|
||||||
|
throws UnsupportedEncodingException {
|
||||||
|
ActorRef<SessionEvent> client = getSession.replyTo;
|
||||||
|
ActorRef<SessionCommand> ses =
|
||||||
|
context.spawn(
|
||||||
|
new SessionBehavior(context.getSelf(), getSession.screenName, client),
|
||||||
|
URLEncoder.encode(getSession.screenName, StandardCharsets.UTF_8.name()));
|
||||||
|
// narrow to only expose PostMessage
|
||||||
|
client.tell(new SessionGranted(ses.narrow()));
|
||||||
|
sessions.add(ses);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<RoomCommand> onPublishSessionMessage(PublishSessionMessage pub) {
|
||||||
|
NotifyClient notification =
|
||||||
|
new NotifyClient((new MessagePosted(pub.screenName, pub.message)));
|
||||||
|
sessions.forEach(s -> s.tell(notification));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SessionBehavior extends AbstractBehavior<ChatRoom.SessionCommand> {
|
static class SessionBehavior extends AbstractBehavior<ChatRoom.SessionCommand> {
|
||||||
|
|
||||||
private final ActorRef<RoomCommand> room;
|
private final ActorRef<RoomCommand> room;
|
||||||
private final String screenName;
|
private final String screenName;
|
||||||
private final ActorRef<SessionEvent> client;
|
private final ActorRef<SessionEvent> client;
|
||||||
|
|
||||||
public SessionBehavior(
|
SessionBehavior(
|
||||||
ActorRef<RoomCommand> room, String screenName, ActorRef<SessionEvent> client) {
|
ActorRef<RoomCommand> room, String screenName, ActorRef<SessionEvent> client) {
|
||||||
this.room = room;
|
this.room = room;
|
||||||
this.screenName = screenName;
|
this.screenName = screenName;
|
||||||
|
|
@ -153,33 +154,35 @@ public class OOIntroTest {
|
||||||
@Override
|
@Override
|
||||||
public Receive<SessionCommand> createReceive() {
|
public Receive<SessionCommand> createReceive() {
|
||||||
return newReceiveBuilder()
|
return newReceiveBuilder()
|
||||||
.onMessage(
|
.onMessage(PostMessage.class, this::onPostMessage)
|
||||||
PostMessage.class,
|
.onMessage(NotifyClient.class, this::onNotifyClient)
|
||||||
post -> {
|
|
||||||
// from client, publish to others via the room
|
|
||||||
room.tell(new PublishSessionMessage(screenName, post.message));
|
|
||||||
return Behaviors.same();
|
|
||||||
})
|
|
||||||
.onMessage(
|
|
||||||
NotifyClient.class,
|
|
||||||
notification -> {
|
|
||||||
// published from the room
|
|
||||||
client.tell(notification.message);
|
|
||||||
return Behaviors.same();
|
|
||||||
})
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Behavior<SessionCommand> onPostMessage(PostMessage post) {
|
||||||
|
// from client, publish to others via the room
|
||||||
|
room.tell(new PublishSessionMessage(screenName, post.message));
|
||||||
|
return Behaviors.same();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<SessionCommand> onNotifyClient(NotifyClient notification) {
|
||||||
|
// published from the room
|
||||||
|
client.tell(notification.message);
|
||||||
|
return Behaviors.same();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// #chatroom-behavior
|
|
||||||
}
|
}
|
||||||
// #chatroom-actor
|
// #chatroom-behavior
|
||||||
|
|
||||||
// #chatroom-gabbler
|
// #chatroom-gabbler
|
||||||
public static class Gabbler extends AbstractBehavior<ChatRoom.SessionEvent> {
|
public static class Gabbler extends AbstractBehavior<ChatRoom.SessionEvent> {
|
||||||
|
public static Behavior<ChatRoom.SessionEvent> create() {
|
||||||
|
return Behaviors.setup(Gabbler::new);
|
||||||
|
}
|
||||||
|
|
||||||
private ActorContext<ChatRoom.SessionEvent> context;
|
private ActorContext<ChatRoom.SessionEvent> context;
|
||||||
|
|
||||||
public Gabbler(ActorContext<ChatRoom.SessionEvent> context) {
|
private Gabbler(ActorContext<ChatRoom.SessionEvent> context) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -187,32 +190,27 @@ public class OOIntroTest {
|
||||||
public Receive<ChatRoom.SessionEvent> createReceive() {
|
public Receive<ChatRoom.SessionEvent> createReceive() {
|
||||||
ReceiveBuilder<ChatRoom.SessionEvent> builder = newReceiveBuilder();
|
ReceiveBuilder<ChatRoom.SessionEvent> builder = newReceiveBuilder();
|
||||||
return builder
|
return builder
|
||||||
.onMessage(
|
.onMessage(ChatRoom.SessionDenied.class, this::onSessionDenied)
|
||||||
ChatRoom.SessionDenied.class,
|
.onMessage(ChatRoom.SessionGranted.class, this::onSessionGranted)
|
||||||
message -> {
|
.onMessage(ChatRoom.MessagePosted.class, this::onMessagePosted)
|
||||||
context.getLog().info("cannot start chat room session: {}", message.reason);
|
|
||||||
return Behaviors.stopped();
|
|
||||||
})
|
|
||||||
.onMessage(
|
|
||||||
ChatRoom.SessionGranted.class,
|
|
||||||
message -> {
|
|
||||||
message.handle.tell(new ChatRoom.PostMessage("Hello World!"));
|
|
||||||
return Behaviors.same();
|
|
||||||
})
|
|
||||||
.onMessage(
|
|
||||||
ChatRoom.MessagePosted.class,
|
|
||||||
message -> {
|
|
||||||
context
|
|
||||||
.getLog()
|
|
||||||
.info(
|
|
||||||
"message has been posted by '{}': {}", message.screenName, message.message);
|
|
||||||
return Behaviors.stopped();
|
|
||||||
})
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Behavior<ChatRoom.SessionEvent> behavior() {
|
private Behavior<ChatRoom.SessionEvent> onSessionDenied(ChatRoom.SessionDenied message) {
|
||||||
return Behaviors.setup(Gabbler::new);
|
context.getLog().info("cannot start chat room session: {}", message.reason);
|
||||||
|
return Behaviors.stopped();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<ChatRoom.SessionEvent> onSessionGranted(ChatRoom.SessionGranted message) {
|
||||||
|
message.handle.tell(new ChatRoom.PostMessage("Hello World!"));
|
||||||
|
return Behaviors.same();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<ChatRoom.SessionEvent> onMessagePosted(ChatRoom.MessagePosted message) {
|
||||||
|
context
|
||||||
|
.getLog()
|
||||||
|
.info("message has been posted by '{}': {}", message.screenName, message.message);
|
||||||
|
return Behaviors.stopped();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// #chatroom-gabbler
|
// #chatroom-gabbler
|
||||||
|
|
@ -224,9 +222,8 @@ public class OOIntroTest {
|
||||||
Behaviors.setup(
|
Behaviors.setup(
|
||||||
context -> {
|
context -> {
|
||||||
ActorRef<ChatRoom.RoomCommand> chatRoom =
|
ActorRef<ChatRoom.RoomCommand> chatRoom =
|
||||||
context.spawn(ChatRoom.behavior(), "chatRoom");
|
context.spawn(ChatRoom.create(), "chatRoom");
|
||||||
ActorRef<ChatRoom.SessionEvent> gabbler =
|
ActorRef<ChatRoom.SessionEvent> gabbler = context.spawn(Gabbler.create(), "gabbler");
|
||||||
context.spawn(Gabbler.behavior(), "gabbler");
|
|
||||||
context.watch(gabbler);
|
context.watch(gabbler);
|
||||||
chatRoom.tell(new ChatRoom.GetSession("ol’ Gabbler", gabbler));
|
chatRoom.tell(new ChatRoom.GetSession("ol’ Gabbler", gabbler));
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ public class RouterTest {
|
||||||
return Behaviors.receive(Command.class)
|
return Behaviors.receive(Command.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
DoLog.class,
|
DoLog.class,
|
||||||
(notUsed, doLog) -> {
|
doLog -> {
|
||||||
context.getLog().info("Got message {}", doLog.text);
|
context.getLog().info("Got message {}", doLog.text);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
package jdocs.akka.typed;
|
package jdocs.akka.typed;
|
||||||
|
|
||||||
// #import
|
// #import
|
||||||
|
import akka.actor.typed.javadsl.ActorContext;
|
||||||
import akka.actor.typed.javadsl.StashBuffer;
|
import akka.actor.typed.javadsl.StashBuffer;
|
||||||
// #import
|
// #import
|
||||||
|
|
||||||
|
|
@ -64,10 +65,8 @@ public class StashDocTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class SaveSuccess implements Command {
|
enum SaveSuccess implements Command {
|
||||||
public static final SaveSuccess instance = new SaveSuccess();
|
INSTANCE
|
||||||
|
|
||||||
private SaveSuccess() {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class DBError implements Command {
|
static class DBError implements Command {
|
||||||
|
|
@ -78,26 +77,28 @@ public class StashDocTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final ActorContext<Command> context;
|
||||||
private final StashBuffer<Command> buffer = StashBuffer.create(100);
|
private final StashBuffer<Command> buffer = StashBuffer.create(100);
|
||||||
private final String id;
|
private final String id;
|
||||||
private final DB db;
|
private final DB db;
|
||||||
|
|
||||||
public DataAccess(String id, DB db) {
|
private DataAccess(ActorContext<Command> context, String id, DB db) {
|
||||||
|
this.context = context;
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.db = db;
|
this.db = db;
|
||||||
}
|
}
|
||||||
|
|
||||||
Behavior<Command> behavior() {
|
public static Behavior<Command> create(String id, DB db) {
|
||||||
return Behaviors.setup(
|
return Behaviors.setup(
|
||||||
context -> {
|
ctx -> {
|
||||||
context.pipeToSelf(
|
ctx.pipeToSelf(
|
||||||
db.load(id),
|
db.load(id),
|
||||||
(value, cause) -> {
|
(value, cause) -> {
|
||||||
if (cause == null) return new InitialState(value);
|
if (cause == null) return new InitialState(value);
|
||||||
else return new DBError(asRuntimeException(cause));
|
else return new DBError(asRuntimeException(cause));
|
||||||
});
|
});
|
||||||
|
|
||||||
return init();
|
return new DataAccess(ctx, id, db).init();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -105,18 +106,18 @@ public class StashDocTest extends JUnitSuite {
|
||||||
return Behaviors.receive(Command.class)
|
return Behaviors.receive(Command.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
InitialState.class,
|
InitialState.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
// now we are ready to handle stashed messages if any
|
// now we are ready to handle stashed messages if any
|
||||||
return buffer.unstashAll(context, active(message.value));
|
return buffer.unstashAll(context, active(message.value));
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
DBError.class,
|
DBError.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
throw message.cause;
|
throw message.cause;
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Command.class,
|
Command.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
// stash all other messages for later processing
|
// stash all other messages for later processing
|
||||||
buffer.stash(message);
|
buffer.stash(message);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
|
|
@ -128,17 +129,17 @@ public class StashDocTest extends JUnitSuite {
|
||||||
return Behaviors.receive(Command.class)
|
return Behaviors.receive(Command.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Get.class,
|
Get.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
message.replyTo.tell(state);
|
message.replyTo.tell(state);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Save.class,
|
Save.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
context.pipeToSelf(
|
context.pipeToSelf(
|
||||||
db.save(id, message.payload),
|
db.save(id, message.payload),
|
||||||
(value, cause) -> {
|
(value, cause) -> {
|
||||||
if (cause == null) return SaveSuccess.instance;
|
if (cause == null) return SaveSuccess.INSTANCE;
|
||||||
else return new DBError(asRuntimeException(cause));
|
else return new DBError(asRuntimeException(cause));
|
||||||
});
|
});
|
||||||
return saving(message.payload, message.replyTo);
|
return saving(message.payload, message.replyTo);
|
||||||
|
|
@ -148,20 +149,20 @@ public class StashDocTest extends JUnitSuite {
|
||||||
|
|
||||||
private Behavior<Command> saving(String state, ActorRef<Done> replyTo) {
|
private Behavior<Command> saving(String state, ActorRef<Done> replyTo) {
|
||||||
return Behaviors.receive(Command.class)
|
return Behaviors.receive(Command.class)
|
||||||
.onMessageEquals(
|
.onMessage(
|
||||||
SaveSuccess.instance,
|
SaveSuccess.class,
|
||||||
context -> {
|
message -> {
|
||||||
replyTo.tell(Done.getInstance());
|
replyTo.tell(Done.getInstance());
|
||||||
return buffer.unstashAll(context, active(state));
|
return buffer.unstashAll(context, active(state));
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
DBError.class,
|
DBError.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
throw message.cause;
|
throw message.cause;
|
||||||
})
|
})
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Command.class,
|
Command.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
buffer.stash(message);
|
buffer.stash(message);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
@ -195,8 +196,7 @@ public class StashDocTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
final ActorRef<DataAccess.Command> dataAccess =
|
final ActorRef<DataAccess.Command> dataAccess = testKit.spawn(DataAccess.create("17", db));
|
||||||
testKit.spawn(new DataAccess("17", db).behavior());
|
|
||||||
TestProbe<String> getInbox = testKit.createTestProbe(String.class);
|
TestProbe<String> getInbox = testKit.createTestProbe(String.class);
|
||||||
dataAccess.tell(new DataAccess.Get(getInbox.getRef()));
|
dataAccess.tell(new DataAccess.Get(getInbox.getRef()));
|
||||||
getInbox.expectMessage("TheValue");
|
getInbox.expectMessage("TheValue");
|
||||||
|
|
|
||||||
|
|
@ -50,11 +50,11 @@ public class TypedWatchingUntypedTest extends JUnitSuite {
|
||||||
return akka.actor.typed.javadsl.Behaviors.receive(Typed.Command.class)
|
return akka.actor.typed.javadsl.Behaviors.receive(Typed.Command.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Typed.Pong.class,
|
Typed.Pong.class,
|
||||||
(_ctx, message) -> {
|
message -> {
|
||||||
Adapter.stop(context, second);
|
Adapter.stop(context, second);
|
||||||
return same();
|
return same();
|
||||||
})
|
})
|
||||||
.onSignal(akka.actor.typed.Terminated.class, (_ctx, sig) -> stopped())
|
.onSignal(akka.actor.typed.Terminated.class, sig -> stopped())
|
||||||
.build();
|
.build();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -74,7 +74,7 @@ public class UntypedWatchingTypedTest extends JUnitSuite {
|
||||||
return Behaviors.receive(Typed.Command.class)
|
return Behaviors.receive(Typed.Command.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Typed.Ping.class,
|
Typed.Ping.class,
|
||||||
(context, message) -> {
|
message -> {
|
||||||
message.replyTo.tell(new Pong());
|
message.replyTo.tell(new Pong());
|
||||||
return same();
|
return same();
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -36,14 +36,10 @@ public class SupervisionCompileOnlyTest {
|
||||||
|
|
||||||
public static Behavior<CounterMessage> counter(int currentValue) {
|
public static Behavior<CounterMessage> counter(int currentValue) {
|
||||||
return Behaviors.receive(CounterMessage.class)
|
return Behaviors.receive(CounterMessage.class)
|
||||||
.onMessage(
|
.onMessage(Increase.class, o -> counter(currentValue + 1))
|
||||||
Increase.class,
|
|
||||||
(context, o) -> {
|
|
||||||
return counter(currentValue + 1);
|
|
||||||
})
|
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Get.class,
|
Get.class,
|
||||||
(context, o) -> {
|
o -> {
|
||||||
o.sender.tell(new Got(currentValue));
|
o.sender.tell(new Got(currentValue));
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -103,8 +103,9 @@ object IntroSpec {
|
||||||
//#hello-world-main-with-dispatchers
|
//#hello-world-main-with-dispatchers
|
||||||
}
|
}
|
||||||
|
|
||||||
//#chatroom-actor
|
//#chatroom-behavior
|
||||||
object ChatRoom {
|
object ChatRoom {
|
||||||
|
//#chatroom-behavior
|
||||||
//#chatroom-protocol
|
//#chatroom-protocol
|
||||||
sealed trait RoomCommand
|
sealed trait RoomCommand
|
||||||
final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommand
|
final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommand
|
||||||
|
|
@ -125,7 +126,7 @@ object IntroSpec {
|
||||||
//#chatroom-protocol
|
//#chatroom-protocol
|
||||||
//#chatroom-behavior
|
//#chatroom-behavior
|
||||||
|
|
||||||
val behavior: Behavior[RoomCommand] =
|
def apply(): Behavior[RoomCommand] =
|
||||||
chatRoom(List.empty)
|
chatRoom(List.empty)
|
||||||
|
|
||||||
private def chatRoom(sessions: List[ActorRef[SessionCommand]]): Behavior[RoomCommand] =
|
private def chatRoom(sessions: List[ActorRef[SessionCommand]]): Behavior[RoomCommand] =
|
||||||
|
|
@ -159,9 +160,32 @@ object IntroSpec {
|
||||||
client ! message
|
client ! message
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
}
|
}
|
||||||
//#chatroom-behavior
|
|
||||||
}
|
}
|
||||||
//#chatroom-actor
|
//#chatroom-behavior
|
||||||
|
|
||||||
|
//#chatroom-gabbler
|
||||||
|
object Gabbler {
|
||||||
|
import ChatRoom._
|
||||||
|
|
||||||
|
def apply(): Behavior[SessionEvent] =
|
||||||
|
Behaviors.setup { context =>
|
||||||
|
Behaviors.receiveMessage {
|
||||||
|
//#chatroom-gabbler
|
||||||
|
// We document that the compiler warns about the missing handler for `SessionDenied`
|
||||||
|
case SessionDenied(reason) =>
|
||||||
|
context.log.info("cannot start chat room session: {}", reason)
|
||||||
|
Behaviors.stopped
|
||||||
|
//#chatroom-gabbler
|
||||||
|
case SessionGranted(handle) =>
|
||||||
|
handle ! PostMessage("Hello World!")
|
||||||
|
Behaviors.same
|
||||||
|
case MessagePosted(screenName, message) =>
|
||||||
|
context.log.info("message has been posted by '{}': {}", screenName, message)
|
||||||
|
Behaviors.stopped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#chatroom-gabbler
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -188,35 +212,13 @@ class IntroSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||||
}
|
}
|
||||||
|
|
||||||
"chat" in {
|
"chat" in {
|
||||||
//#chatroom-gabbler
|
|
||||||
import ChatRoom._
|
|
||||||
|
|
||||||
val gabbler: Behavior[SessionEvent] =
|
|
||||||
Behaviors.setup { context =>
|
|
||||||
Behaviors.receiveMessage {
|
|
||||||
//#chatroom-gabbler
|
|
||||||
// We document that the compiler warns about the missing handler for `SessionDenied`
|
|
||||||
case SessionDenied(reason) =>
|
|
||||||
context.log.info("cannot start chat room session: {}", reason)
|
|
||||||
Behaviors.stopped
|
|
||||||
//#chatroom-gabbler
|
|
||||||
case SessionGranted(handle) =>
|
|
||||||
handle ! PostMessage("Hello World!")
|
|
||||||
Behaviors.same
|
|
||||||
case MessagePosted(screenName, message) =>
|
|
||||||
context.log.info("message has been posted by '{}': {}", screenName, message)
|
|
||||||
Behaviors.stopped
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//#chatroom-gabbler
|
|
||||||
|
|
||||||
//#chatroom-main
|
//#chatroom-main
|
||||||
val main: Behavior[NotUsed] =
|
val main: Behavior[NotUsed] =
|
||||||
Behaviors.setup { context =>
|
Behaviors.setup { context =>
|
||||||
val chatRoom = context.spawn(ChatRoom.behavior, "chatroom")
|
val chatRoom = context.spawn(ChatRoom(), "chatroom")
|
||||||
val gabblerRef = context.spawn(gabbler, "gabbler")
|
val gabblerRef = context.spawn(Gabbler(), "gabbler")
|
||||||
context.watch(gabblerRef)
|
context.watch(gabblerRef)
|
||||||
chatRoom ! GetSession("ol’ Gabbler", gabblerRef)
|
chatRoom ! ChatRoom.GetSession("ol’ Gabbler", gabblerRef)
|
||||||
|
|
||||||
Behaviors.receiveSignal {
|
Behaviors.receiveSignal {
|
||||||
case (_, Terminated(_)) =>
|
case (_, Terminated(_)) =>
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,9 @@ import org.scalatest.WordSpecLike
|
||||||
|
|
||||||
object OOIntroSpec {
|
object OOIntroSpec {
|
||||||
|
|
||||||
//#chatroom-actor
|
//#chatroom-behavior
|
||||||
object ChatRoom {
|
object ChatRoom {
|
||||||
|
//#chatroom-behavior
|
||||||
//#chatroom-protocol
|
//#chatroom-protocol
|
||||||
sealed trait RoomCommand
|
sealed trait RoomCommand
|
||||||
final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommand
|
final case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommand
|
||||||
|
|
@ -39,8 +40,8 @@ object OOIntroSpec {
|
||||||
//#chatroom-protocol
|
//#chatroom-protocol
|
||||||
//#chatroom-behavior
|
//#chatroom-behavior
|
||||||
|
|
||||||
def behavior(): Behavior[RoomCommand] =
|
def apply(): Behavior[RoomCommand] =
|
||||||
Behaviors.setup[RoomCommand](context => new ChatRoomBehavior(context))
|
Behaviors.setup(context => new ChatRoomBehavior(context))
|
||||||
|
|
||||||
class ChatRoomBehavior(context: ActorContext[RoomCommand]) extends AbstractBehavior[RoomCommand] {
|
class ChatRoomBehavior(context: ActorContext[RoomCommand]) extends AbstractBehavior[RoomCommand] {
|
||||||
private var sessions: List[ActorRef[SessionCommand]] = List.empty
|
private var sessions: List[ActorRef[SessionCommand]] = List.empty
|
||||||
|
|
@ -50,7 +51,7 @@ object OOIntroSpec {
|
||||||
case GetSession(screenName, client) =>
|
case GetSession(screenName, client) =>
|
||||||
// create a child actor for further interaction with the client
|
// create a child actor for further interaction with the client
|
||||||
val ses = context.spawn(
|
val ses = context.spawn(
|
||||||
session(context.self, screenName, client),
|
new SessionBehavior(context.self, screenName, client),
|
||||||
name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name))
|
name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name))
|
||||||
client ! SessionGranted(ses)
|
client ! SessionGranted(ses)
|
||||||
sessions = ses :: sessions
|
sessions = ses :: sessions
|
||||||
|
|
@ -63,23 +64,48 @@ object OOIntroSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def session(
|
private class SessionBehavior(
|
||||||
room: ActorRef[PublishSessionMessage],
|
room: ActorRef[PublishSessionMessage],
|
||||||
screenName: String,
|
screenName: String,
|
||||||
client: ActorRef[SessionEvent]): Behavior[SessionCommand] =
|
client: ActorRef[SessionEvent])
|
||||||
Behaviors.receiveMessage {
|
extends AbstractBehavior[SessionCommand] {
|
||||||
case PostMessage(message) =>
|
|
||||||
// from client, publish to others via the room
|
override def onMessage(msg: SessionCommand): Behavior[SessionCommand] = {
|
||||||
room ! PublishSessionMessage(screenName, message)
|
msg match {
|
||||||
Behaviors.same
|
case PostMessage(message) =>
|
||||||
case NotifyClient(message) =>
|
// from client, publish to others via the room
|
||||||
// published from the room
|
room ! PublishSessionMessage(screenName, message)
|
||||||
client ! message
|
Behaviors.same
|
||||||
Behaviors.same
|
case NotifyClient(message) =>
|
||||||
|
// published from the room
|
||||||
|
client ! message
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//#chatroom-behavior
|
}
|
||||||
|
}
|
||||||
|
//#chatroom-behavior
|
||||||
|
|
||||||
|
//#chatroom-gabbler
|
||||||
|
object Gabbler {
|
||||||
|
import ChatRoom._
|
||||||
|
|
||||||
|
def apply(): Behavior[SessionEvent] =
|
||||||
|
Behaviors.setup { context =>
|
||||||
|
Behaviors.receiveMessage {
|
||||||
|
case SessionDenied(reason) =>
|
||||||
|
context.log.info("cannot start chat room session: {}", reason)
|
||||||
|
Behaviors.stopped
|
||||||
|
case SessionGranted(handle) =>
|
||||||
|
handle ! PostMessage("Hello World!")
|
||||||
|
Behaviors.same
|
||||||
|
case MessagePosted(screenName, message) =>
|
||||||
|
context.log.info("message has been posted by '{}': {}", screenName, message)
|
||||||
|
Behaviors.stopped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#chatroom-gabbler
|
||||||
}
|
}
|
||||||
//#chatroom-actor
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -89,36 +115,17 @@ class OOIntroSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||||
|
|
||||||
"A chat room" must {
|
"A chat room" must {
|
||||||
"chat" in {
|
"chat" in {
|
||||||
//#chatroom-gabbler
|
|
||||||
import ChatRoom._
|
|
||||||
|
|
||||||
val gabbler =
|
|
||||||
Behaviors.setup[SessionEvent] { context =>
|
|
||||||
Behaviors.receiveMessage[SessionEvent] {
|
|
||||||
case SessionDenied(reason) =>
|
|
||||||
context.log.info("cannot start chat room session: {}", reason)
|
|
||||||
Behaviors.stopped
|
|
||||||
case SessionGranted(handle) =>
|
|
||||||
handle ! PostMessage("Hello World!")
|
|
||||||
Behaviors.same
|
|
||||||
case MessagePosted(screenName, message) =>
|
|
||||||
context.log.info("message has been posted by '{}': {}", screenName, message)
|
|
||||||
Behaviors.stopped
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//#chatroom-gabbler
|
|
||||||
|
|
||||||
//#chatroom-main
|
//#chatroom-main
|
||||||
val main: Behavior[String] =
|
val main: Behavior[String] =
|
||||||
Behaviors.setup { context =>
|
Behaviors.setup { context =>
|
||||||
val chatRoom = context.spawn(ChatRoom.behavior(), "chatroom")
|
val chatRoom = context.spawn(ChatRoom(), "chatroom")
|
||||||
val gabblerRef = context.spawn(gabbler, "gabbler")
|
val gabblerRef = context.spawn(Gabbler(), "gabbler")
|
||||||
context.watch(gabblerRef)
|
context.watch(gabblerRef)
|
||||||
|
|
||||||
Behaviors
|
Behaviors
|
||||||
.receiveMessagePartial[String] {
|
.receiveMessagePartial[String] {
|
||||||
case "go" =>
|
case "go" =>
|
||||||
chatRoom ! GetSession("ol’ Gabbler", gabblerRef)
|
chatRoom ! ChatRoom.GetSession("ol’ Gabbler", gabblerRef)
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
}
|
}
|
||||||
.receiveSignal {
|
.receiveSignal {
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,11 @@
|
||||||
|
|
||||||
package akka.actor.typed.javadsl
|
package akka.actor.typed.javadsl
|
||||||
|
|
||||||
|
import java.util.function.Supplier
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
import akka.japi.function.{ Function => JFunction }
|
import akka.japi.function.{ Function => JFunction }
|
||||||
import akka.japi.function.{ Function2 => JFunction2 }
|
|
||||||
import akka.japi.function.{ Predicate => JPredicate }
|
import akka.japi.function.{ Predicate => JPredicate }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.actor.typed.Behavior
|
import akka.actor.typed.Behavior
|
||||||
|
|
@ -39,7 +41,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
* @tparam M type of message to match
|
* @tparam M type of message to match
|
||||||
* @return a new behavior builder with the specified handling appended
|
* @return a new behavior builder with the specified handling appended
|
||||||
*/
|
*/
|
||||||
def onMessage[M <: T](`type`: Class[M], handler: JFunction2[ActorContext[T], M, Behavior[T]]): BehaviorBuilder[T] =
|
def onMessage[M <: T](`type`: Class[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
|
||||||
withMessage(OptionVal.Some(`type`), OptionVal.None, handler)
|
withMessage(OptionVal.Some(`type`), OptionVal.None, handler)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -51,10 +53,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
* @tparam M type of message to match
|
* @tparam M type of message to match
|
||||||
* @return a new behavior builder with the specified handling appended
|
* @return a new behavior builder with the specified handling appended
|
||||||
*/
|
*/
|
||||||
def onMessage[M <: T](
|
def onMessage[M <: T](`type`: Class[M], test: JPredicate[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
|
||||||
`type`: Class[M],
|
|
||||||
test: JPredicate[M],
|
|
||||||
handler: JFunction2[ActorContext[T], M, Behavior[T]]): BehaviorBuilder[T] =
|
|
||||||
withMessage(OptionVal.Some(`type`), OptionVal.Some((t: T) => test.test(t.asInstanceOf[M])), handler)
|
withMessage(OptionVal.Some(`type`), OptionVal.Some((t: T) => test.test(t.asInstanceOf[M])), handler)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -67,9 +66,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
* @param handler action to apply when the type matches
|
* @param handler action to apply when the type matches
|
||||||
* @return a new behavior builder with the specified handling appended
|
* @return a new behavior builder with the specified handling appended
|
||||||
*/
|
*/
|
||||||
def onMessageUnchecked[M <: T](
|
def onMessageUnchecked[M <: T](`type`: Class[_ <: T], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
|
||||||
`type`: Class[_ <: T],
|
|
||||||
handler: JFunction2[ActorContext[T], M, Behavior[T]]): BehaviorBuilder[T] =
|
|
||||||
withMessage[M](OptionVal.Some(`type`.asInstanceOf[Class[M]]), OptionVal.None, handler)
|
withMessage[M](OptionVal.Some(`type`.asInstanceOf[Class[M]]), OptionVal.None, handler)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -79,12 +76,12 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
* @param handler action to apply when the message matches
|
* @param handler action to apply when the message matches
|
||||||
* @return a new behavior builder with the specified handling appended
|
* @return a new behavior builder with the specified handling appended
|
||||||
*/
|
*/
|
||||||
def onMessageEquals(msg: T, handler: JFunction[ActorContext[T], Behavior[T]]): BehaviorBuilder[T] =
|
def onMessageEquals(msg: T, handler: Supplier[Behavior[T]]): BehaviorBuilder[T] =
|
||||||
withMessage[T](
|
withMessage[T](
|
||||||
OptionVal.Some(msg.getClass.asInstanceOf[Class[T]]),
|
OptionVal.Some(msg.getClass.asInstanceOf[Class[T]]),
|
||||||
OptionVal.Some(_.equals(msg)),
|
OptionVal.Some(_.equals(msg)),
|
||||||
new JFunction2[ActorContext[T], T, Behavior[T]] {
|
new JFunction[T, Behavior[T]] {
|
||||||
override def apply(ctx: ActorContext[T], msg: T): Behavior[T] = handler.apply(ctx)
|
override def apply(msg: T): Behavior[T] = handler.get()
|
||||||
})
|
})
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -94,7 +91,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
* @param handler action to apply for any message
|
* @param handler action to apply for any message
|
||||||
* @return a new behavior builder with the specified handling appended
|
* @return a new behavior builder with the specified handling appended
|
||||||
*/
|
*/
|
||||||
def onAnyMessage(handler: JFunction2[ActorContext[T], T, Behavior[T]]): BehaviorBuilder[T] =
|
def onAnyMessage(handler: JFunction[T, Behavior[T]]): BehaviorBuilder[T] =
|
||||||
withMessage(OptionVal.None, OptionVal.None, handler)
|
withMessage(OptionVal.None, OptionVal.None, handler)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -105,10 +102,8 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
* @tparam M type of signal to match
|
* @tparam M type of signal to match
|
||||||
* @return a new behavior builder with the specified handling appended
|
* @return a new behavior builder with the specified handling appended
|
||||||
*/
|
*/
|
||||||
def onSignal[M <: Signal](
|
def onSignal[M <: Signal](`type`: Class[M], handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
|
||||||
`type`: Class[M],
|
withSignal(`type`, OptionVal.None, handler.asInstanceOf[JFunction[Signal, Behavior[T]]])
|
||||||
handler: JFunction2[ActorContext[T], M, Behavior[T]]): BehaviorBuilder[T] =
|
|
||||||
withSignal(`type`, OptionVal.None, handler.asInstanceOf[JFunction2[ActorContext[T], Signal, Behavior[T]]])
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a new predicated case to the signal handling.
|
* Add a new predicated case to the signal handling.
|
||||||
|
|
@ -122,11 +117,11 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
def onSignal[M <: Signal](
|
def onSignal[M <: Signal](
|
||||||
`type`: Class[M],
|
`type`: Class[M],
|
||||||
test: JPredicate[M],
|
test: JPredicate[M],
|
||||||
handler: JFunction2[ActorContext[T], M, Behavior[T]]): BehaviorBuilder[T] =
|
handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] =
|
||||||
withSignal(
|
withSignal(
|
||||||
`type`,
|
`type`,
|
||||||
OptionVal.Some((t: Signal) => test.test(t.asInstanceOf[M])),
|
OptionVal.Some((t: Signal) => test.test(t.asInstanceOf[M])),
|
||||||
handler.asInstanceOf[JFunction2[ActorContext[T], Signal, Behavior[T]]])
|
handler.asInstanceOf[JFunction[Signal, Behavior[T]]])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a new case to the signal handling matching equal signals.
|
* Add a new case to the signal handling matching equal signals.
|
||||||
|
|
@ -135,17 +130,17 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
* @param handler action to apply when the message matches
|
* @param handler action to apply when the message matches
|
||||||
* @return a new behavior builder with the specified handling appended
|
* @return a new behavior builder with the specified handling appended
|
||||||
*/
|
*/
|
||||||
def onSignalEquals(signal: Signal, handler: Function[ActorContext[T], Behavior[T]]): BehaviorBuilder[T] =
|
def onSignalEquals(signal: Signal, handler: Supplier[Behavior[T]]): BehaviorBuilder[T] =
|
||||||
withSignal(signal.getClass, OptionVal.Some(_.equals(signal)), new JFunction2[ActorContext[T], Signal, Behavior[T]] {
|
withSignal(signal.getClass, OptionVal.Some(_.equals(signal)), new JFunction[Signal, Behavior[T]] {
|
||||||
override def apply(ctx: ActorContext[T], signal: Signal): Behavior[T] = {
|
override def apply(signal: Signal): Behavior[T] = {
|
||||||
handler.apply(ctx)
|
handler.get()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
private def withMessage[M <: T](
|
private def withMessage[M <: T](
|
||||||
clazz: OptionVal[Class[M]],
|
clazz: OptionVal[Class[M]],
|
||||||
test: OptionVal[M => Boolean],
|
test: OptionVal[M => Boolean],
|
||||||
handler: JFunction2[ActorContext[T], M, Behavior[T]]): BehaviorBuilder[T] = {
|
handler: JFunction[M, Behavior[T]]): BehaviorBuilder[T] = {
|
||||||
val newCase = Case(clazz, test, handler)
|
val newCase = Case(clazz, test, handler)
|
||||||
new BehaviorBuilder[T](newCase.asInstanceOf[Case[T, T]] +: messageHandlers, signalHandlers)
|
new BehaviorBuilder[T](newCase.asInstanceOf[Case[T, T]] +: messageHandlers, signalHandlers)
|
||||||
}
|
}
|
||||||
|
|
@ -153,7 +148,7 @@ final class BehaviorBuilder[T] private (messageHandlers: List[Case[T, T]], signa
|
||||||
private def withSignal[M <: Signal](
|
private def withSignal[M <: Signal](
|
||||||
`type`: Class[M],
|
`type`: Class[M],
|
||||||
test: OptionVal[Signal => Boolean],
|
test: OptionVal[Signal => Boolean],
|
||||||
handler: JFunction2[ActorContext[T], Signal, Behavior[T]]): BehaviorBuilder[T] = {
|
handler: JFunction[Signal, Behavior[T]]): BehaviorBuilder[T] = {
|
||||||
new BehaviorBuilder[T](
|
new BehaviorBuilder[T](
|
||||||
messageHandlers,
|
messageHandlers,
|
||||||
Case(OptionVal.Some(`type`), test, handler).asInstanceOf[Case[T, Signal]] +: signalHandlers)
|
Case(OptionVal.Some(`type`), test, handler).asInstanceOf[Case[T, Signal]] +: signalHandlers)
|
||||||
|
|
@ -170,7 +165,7 @@ object BehaviorBuilder {
|
||||||
private[javadsl] final case class Case[BT, MT](
|
private[javadsl] final case class Case[BT, MT](
|
||||||
`type`: OptionVal[Class[_ <: MT]],
|
`type`: OptionVal[Class[_ <: MT]],
|
||||||
test: OptionVal[MT => Boolean],
|
test: OptionVal[MT => Boolean],
|
||||||
handler: JFunction2[ActorContext[BT], MT, Behavior[BT]])
|
handler: JFunction[MT, Behavior[BT]])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return new empty immutable behavior builder.
|
* @return new empty immutable behavior builder.
|
||||||
|
|
@ -187,18 +182,18 @@ object BehaviorBuilder {
|
||||||
private final class BuiltBehavior[T](messageHandlers: List[Case[T, T]], signalHandlers: List[Case[T, Signal]])
|
private final class BuiltBehavior[T](messageHandlers: List[Case[T, T]], signalHandlers: List[Case[T, Signal]])
|
||||||
extends ExtensibleBehavior[T] {
|
extends ExtensibleBehavior[T] {
|
||||||
|
|
||||||
override def receive(ctx: TypedActorContext[T], msg: T): Behavior[T] = receive(ctx.asJava, msg, messageHandlers)
|
override def receive(ctx: TypedActorContext[T], msg: T): Behavior[T] = receive(msg, messageHandlers)
|
||||||
|
|
||||||
override def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] =
|
override def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] =
|
||||||
receive(ctx.asJava, msg, signalHandlers)
|
receive(msg, signalHandlers)
|
||||||
|
|
||||||
@tailrec
|
@tailrec
|
||||||
private def receive[M](ctx: ActorContext[T], msg: M, handlers: List[Case[T, M]]): Behavior[T] =
|
private def receive[M](msg: M, handlers: List[Case[T, M]]): Behavior[T] =
|
||||||
handlers match {
|
handlers match {
|
||||||
case Case(cls, predicate, handler) :: tail =>
|
case Case(cls, predicate, handler) :: tail =>
|
||||||
if ((cls.isEmpty || cls.get.isAssignableFrom(msg.getClass)) && (predicate.isEmpty || predicate.get.apply(msg)))
|
if ((cls.isEmpty || cls.get.isAssignableFrom(msg.getClass)) && (predicate.isEmpty || predicate.get.apply(msg)))
|
||||||
handler(ctx, msg)
|
handler(msg)
|
||||||
else receive(ctx, msg, tail)
|
else receive(msg, tail)
|
||||||
case Nil =>
|
case Nil =>
|
||||||
Behaviors.unhandled[T]
|
Behaviors.unhandled[T]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -43,14 +43,10 @@ public class ShardingCompileOnlyTest {
|
||||||
|
|
||||||
public static Behavior<CounterCommand> counter(String entityId, Integer value) {
|
public static Behavior<CounterCommand> counter(String entityId, Integer value) {
|
||||||
return Behaviors.receive(CounterCommand.class)
|
return Behaviors.receive(CounterCommand.class)
|
||||||
.onMessage(
|
.onMessage(Increment.class, msg -> counter(entityId, value + 1))
|
||||||
Increment.class,
|
|
||||||
(ctx, msg) -> {
|
|
||||||
return counter(entityId, value + 1);
|
|
||||||
})
|
|
||||||
.onMessage(
|
.onMessage(
|
||||||
GetValue.class,
|
GetValue.class,
|
||||||
(ctx, msg) -> {
|
msg -> {
|
||||||
msg.replyTo.tell(value);
|
msg.replyTo.tell(value);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
|
|
@ -74,32 +70,30 @@ public class ShardingCompileOnlyTest {
|
||||||
|
|
||||||
private static Behavior<CounterCommand> counter2(
|
private static Behavior<CounterCommand> counter2(
|
||||||
ActorRef<ClusterSharding.ShardCommand> shard, String entityId, Integer value) {
|
ActorRef<ClusterSharding.ShardCommand> shard, String entityId, Integer value) {
|
||||||
return Behaviors.receive(CounterCommand.class)
|
return Behaviors.setup(
|
||||||
.onMessage(
|
context ->
|
||||||
Increment.class,
|
Behaviors.receive(CounterCommand.class)
|
||||||
(ctx, msg) -> {
|
.onMessage(Increment.class, msg -> counter(entityId, value + 1))
|
||||||
return counter(entityId, value + 1);
|
.onMessage(
|
||||||
})
|
GetValue.class,
|
||||||
.onMessage(
|
msg -> {
|
||||||
GetValue.class,
|
msg.replyTo.tell(value);
|
||||||
(ctx, msg) -> {
|
return Behaviors.same();
|
||||||
msg.replyTo.tell(value);
|
})
|
||||||
return Behaviors.same();
|
.onMessage(
|
||||||
})
|
Idle.class,
|
||||||
.onMessage(
|
msg -> {
|
||||||
Idle.class,
|
// after receive timeout
|
||||||
(ctx, msg) -> {
|
shard.tell(new ClusterSharding.Passivate<>(context.getSelf()));
|
||||||
// after receive timeout
|
return Behaviors.same();
|
||||||
shard.tell(new ClusterSharding.Passivate<>(ctx.getSelf()));
|
})
|
||||||
return Behaviors.same();
|
.onMessage(
|
||||||
})
|
GoodByeCounter.class,
|
||||||
.onMessage(
|
msg -> {
|
||||||
GoodByeCounter.class,
|
// the stopMessage, used for rebalance and passivate
|
||||||
(ctx, msg) -> {
|
return Behaviors.stopped();
|
||||||
// the stopMessage, used for rebalance and passivate
|
})
|
||||||
return Behaviors.stopped();
|
.build());
|
||||||
})
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
// #counter-passivate
|
// #counter-passivate
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,11 +16,18 @@ import akka.actor.typed.ActorSystem;
|
||||||
|
|
||||||
public class ReceptionistExample {
|
public class ReceptionistExample {
|
||||||
|
|
||||||
|
public
|
||||||
// #ping-service
|
// #ping-service
|
||||||
public static class PingService {
|
static class PingService {
|
||||||
|
|
||||||
|
private final ActorContext<Ping> context;
|
||||||
|
|
||||||
static final ServiceKey<Ping> pingServiceKey = ServiceKey.create(Ping.class, "pingService");
|
static final ServiceKey<Ping> pingServiceKey = ServiceKey.create(Ping.class, "pingService");
|
||||||
|
|
||||||
|
private PingService(ActorContext<Ping> context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
public static class Pong {}
|
public static class Pong {}
|
||||||
|
|
||||||
public static class Ping {
|
public static class Ping {
|
||||||
|
|
@ -31,7 +38,7 @@ public class ReceptionistExample {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static Behavior<Ping> createBehavior() {
|
public static Behavior<Ping> createBehavior() {
|
||||||
return Behaviors.setup(
|
return Behaviors.setup(
|
||||||
context -> {
|
context -> {
|
||||||
context
|
context
|
||||||
|
|
@ -39,11 +46,15 @@ public class ReceptionistExample {
|
||||||
.receptionist()
|
.receptionist()
|
||||||
.tell(Receptionist.register(pingServiceKey, context.getSelf()));
|
.tell(Receptionist.register(pingServiceKey, context.getSelf()));
|
||||||
|
|
||||||
return Behaviors.receive(Ping.class).onMessage(Ping.class, PingService::onPing).build();
|
return new PingService(context).behavior();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Behavior<Ping> onPing(ActorContext<Ping> context, Ping msg) {
|
private Behavior<Ping> behavior() {
|
||||||
|
return Behaviors.receive(Ping.class).onMessage(Ping.class, this::onPing).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<Ping> onPing(Ping msg) {
|
||||||
context.getLog().info("Pinged by {}", msg.replyTo);
|
context.getLog().info("Pinged by {}", msg.replyTo);
|
||||||
msg.replyTo.tell(new Pong());
|
msg.replyTo.tell(new Pong());
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
|
|
@ -51,20 +62,33 @@ public class ReceptionistExample {
|
||||||
}
|
}
|
||||||
// #ping-service
|
// #ping-service
|
||||||
|
|
||||||
|
public
|
||||||
// #pinger
|
// #pinger
|
||||||
public static class Pinger {
|
static class Pinger {
|
||||||
static Behavior<PingService.Pong> createBehavior(ActorRef<PingService.Ping> pingService) {
|
private final ActorContext<PingService.Pong> context;
|
||||||
|
private final ActorRef<PingService.Ping> pingService;
|
||||||
|
|
||||||
|
private Pinger(ActorContext<PingService.Pong> context, ActorRef<PingService.Ping> pingService) {
|
||||||
|
this.context = context;
|
||||||
|
this.pingService = pingService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Behavior<PingService.Pong> createBehavior(
|
||||||
|
ActorRef<PingService.Ping> pingService) {
|
||||||
return Behaviors.setup(
|
return Behaviors.setup(
|
||||||
(ctx) -> {
|
ctx -> {
|
||||||
pingService.tell(new PingService.Ping(ctx.getSelf()));
|
pingService.tell(new PingService.Ping(ctx.getSelf()));
|
||||||
return Behaviors.receive(PingService.Pong.class)
|
return new Pinger(ctx, pingService).behavior();
|
||||||
.onMessage(PingService.Pong.class, Pinger::onPong)
|
|
||||||
.build();
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Behavior<PingService.Pong> onPong(
|
private Behavior<PingService.Pong> behavior() {
|
||||||
ActorContext<PingService.Pong> context, PingService.Pong msg) {
|
return Behaviors.receive(PingService.Pong.class)
|
||||||
|
.onMessage(PingService.Pong.class, this::onPong)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Behavior<PingService.Pong> onPong(PingService.Pong msg) {
|
||||||
context.getLog().info("{} was ponged!!", context.getSelf());
|
context.getLog().info("{} was ponged!!", context.getSelf());
|
||||||
return Behaviors.stopped();
|
return Behaviors.stopped();
|
||||||
}
|
}
|
||||||
|
|
@ -85,7 +109,7 @@ public class ReceptionistExample {
|
||||||
return Behaviors.receive(Object.class)
|
return Behaviors.receive(Object.class)
|
||||||
.onMessage(
|
.onMessage(
|
||||||
Receptionist.Listing.class,
|
Receptionist.Listing.class,
|
||||||
(c, msg) -> {
|
msg -> {
|
||||||
msg.getServiceInstances(PingService.pingServiceKey)
|
msg.getServiceInstances(PingService.pingServiceKey)
|
||||||
.forEach(
|
.forEach(
|
||||||
pingService ->
|
pingService ->
|
||||||
|
|
|
||||||
|
|
@ -31,14 +31,14 @@ public class SingletonCompileOnlyTest {
|
||||||
|
|
||||||
public static Behavior<CounterCommand> counter(String entityId, Integer value) {
|
public static Behavior<CounterCommand> counter(String entityId, Integer value) {
|
||||||
return Behaviors.receive(CounterCommand.class)
|
return Behaviors.receive(CounterCommand.class)
|
||||||
.onMessage(Increment.class, (ctx, msg) -> counter(entityId, value + 1))
|
.onMessage(Increment.class, msg -> counter(entityId, value + 1))
|
||||||
.onMessage(
|
.onMessage(
|
||||||
GetValue.class,
|
GetValue.class,
|
||||||
(ctx, msg) -> {
|
msg -> {
|
||||||
msg.replyTo.tell(value);
|
msg.replyTo.tell(value);
|
||||||
return Behaviors.same();
|
return Behaviors.same();
|
||||||
})
|
})
|
||||||
.onMessage(GoodByeCounter.class, (ctx, msg) -> Behaviors.stopped())
|
.onMessage(GoodByeCounter.class, msg -> Behaviors.stopped())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
// #counter
|
// #counter
|
||||||
|
|
|
||||||
|
|
@ -327,6 +327,9 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
|
||||||
* The `request` parameter in Distributed Data commands was removed, in favor of using `ask`.
|
* The `request` parameter in Distributed Data commands was removed, in favor of using `ask`.
|
||||||
* Removed `Behavior.same`, `Behavior.unhandled`, `Behavior.stopped`, `Behavior.empty`, and `Behavior.ignore` since
|
* Removed `Behavior.same`, `Behavior.unhandled`, `Behavior.stopped`, `Behavior.empty`, and `Behavior.ignore` since
|
||||||
they were redundant with corresponding @scala[scaladsl.Behaviors.x]@java[javadsl.Behaviors.x].
|
they were redundant with corresponding @scala[scaladsl.Behaviors.x]@java[javadsl.Behaviors.x].
|
||||||
|
* `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup`
|
||||||
|
to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`.
|
||||||
|
|
||||||
|
|
||||||
#### Akka Typed Stream API changes
|
#### Akka Typed Stream API changes
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -366,7 +366,7 @@ screen name.
|
||||||
To implement the logic where we spawn a child for the session we need access
|
To implement the logic where we spawn a child for the session we need access
|
||||||
to the `ActorContext`. This is injected as a constructor parameter upon creation
|
to the `ActorContext`. This is injected as a constructor parameter upon creation
|
||||||
of the behavior, note how we combine the `AbstractBehavior` with `Behaviors.setup`
|
of the behavior, note how we combine the `AbstractBehavior` with `Behaviors.setup`
|
||||||
to do this in the `behavior` method.
|
to do this in the @scala[`apply`]@java[`create`] factory method.
|
||||||
|
|
||||||
The behavior that we declare here can handle both subtypes of `RoomCommand`.
|
The behavior that we declare here can handle both subtypes of `RoomCommand`.
|
||||||
`GetSession` has been explained already and the
|
`GetSession` has been explained already and the
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue