Java style accessors for AbstractFSM #22592
This commit is contained in:
parent
363ca39f52
commit
07e88300bc
24 changed files with 85 additions and 58 deletions
|
|
@ -36,6 +36,32 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
|
||||||
import java.util.{ List ⇒ JList }
|
import java.util.{ List ⇒ JList }
|
||||||
import FSM._
|
import FSM._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns this AbstractActor's ActorContext
|
||||||
|
* The ActorContext is not thread safe so do not expose it outside of the
|
||||||
|
* AbstractActor.
|
||||||
|
*/
|
||||||
|
def getContext(): AbstractActor.ActorContext = context.asInstanceOf[AbstractActor.ActorContext]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the ActorRef for this actor.
|
||||||
|
*
|
||||||
|
* Same as `self()`.
|
||||||
|
*/
|
||||||
|
def getSelf(): ActorRef = self
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The reference sender Actor of the currently processed message. This is
|
||||||
|
* always a legal destination to send to, even if there is no logical recipient
|
||||||
|
* for the reply, in which case it will be sent to the dead letter mailbox.
|
||||||
|
*
|
||||||
|
* Same as `sender()`.
|
||||||
|
*
|
||||||
|
* WARNING: Only valid within the Actor itself, so do not close over it and
|
||||||
|
* publish it to other threads!
|
||||||
|
*/
|
||||||
|
def getSender(): ActorRef = sender()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Insert a new StateFunction at the end of the processing chain for the
|
* Insert a new StateFunction at the end of the processing chain for the
|
||||||
* given state.
|
* given state.
|
||||||
|
|
|
||||||
|
|
@ -398,7 +398,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
||||||
getSender().tell("service unavailable, shutting down", getSelf())
|
getSender().tell("service unavailable, shutting down", getSelf())
|
||||||
)
|
)
|
||||||
.match(Terminated.class, t -> t.actor().equals(worker), t ->
|
.match(Terminated.class, t -> t.actor().equals(worker), t ->
|
||||||
getContext().stop(self())
|
getContext().stop(getSelf())
|
||||||
)
|
)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
@ -716,7 +716,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
||||||
getContext().become(active(ref));
|
getContext().become(active(ref));
|
||||||
})
|
})
|
||||||
.match(ActorIdentity.class, id -> !id.getActorRef().isPresent(), id -> {
|
.match(ActorIdentity.class, id -> !id.getActorRef().isPresent(), id -> {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
@ -724,7 +724,7 @@ public class ActorDocTest extends AbstractJavaTest {
|
||||||
final AbstractActor.Receive active(final ActorRef another) {
|
final AbstractActor.Receive active(final ActorRef another) {
|
||||||
return receiveBuilder()
|
return receiveBuilder()
|
||||||
.match(Terminated.class, t -> t.actor().equals(another), t ->
|
.match(Terminated.class, t -> t.actor().equals(another), t ->
|
||||||
getContext().stop(self())
|
getContext().stop(getSelf())
|
||||||
)
|
)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
*/
|
*/
|
||||||
package jdocs.actor.japi;
|
package jdocs.actor;
|
||||||
|
|
||||||
//#all
|
//#all
|
||||||
//#imports
|
//#imports
|
||||||
|
|
@ -14,6 +14,7 @@ import akka.actor.*;
|
||||||
import akka.dispatch.Mapper;
|
import akka.dispatch.Mapper;
|
||||||
import akka.event.LoggingReceive;
|
import akka.event.LoggingReceive;
|
||||||
import akka.japi.pf.DeciderBuilder;
|
import akka.japi.pf.DeciderBuilder;
|
||||||
|
import akka.pattern.Patterns;
|
||||||
import akka.util.Timeout;
|
import akka.util.Timeout;
|
||||||
import com.typesafe.config.Config;
|
import com.typesafe.config.Config;
|
||||||
import com.typesafe.config.ConfigFactory;
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
|
@ -27,10 +28,10 @@ import static akka.actor.SupervisorStrategy.escalate;
|
||||||
import static akka.pattern.Patterns.ask;
|
import static akka.pattern.Patterns.ask;
|
||||||
import static akka.pattern.Patterns.pipe;
|
import static akka.pattern.Patterns.pipe;
|
||||||
|
|
||||||
import static jdocs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
|
import static jdocs.actor.FaultHandlingDocSample.WorkerApi.*;
|
||||||
import static jdocs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
|
import static jdocs.actor.FaultHandlingDocSample.CounterServiceApi.*;
|
||||||
import static jdocs.actor.japi.FaultHandlingDocSample.CounterApi.*;
|
import static jdocs.actor.FaultHandlingDocSample.CounterApi.*;
|
||||||
import static jdocs.actor.japi.FaultHandlingDocSample.StorageApi.*;
|
import static jdocs.actor.FaultHandlingDocSample.StorageApi.*;
|
||||||
|
|
||||||
//#imports
|
//#imports
|
||||||
|
|
||||||
|
|
@ -148,7 +149,7 @@ public class FaultHandlingDocSample {
|
||||||
counterService.tell(new Increment(1), getSelf());
|
counterService.tell(new Increment(1), getSelf());
|
||||||
counterService.tell(new Increment(1), getSelf());
|
counterService.tell(new Increment(1), getSelf());
|
||||||
// Send current progress to the initial sender
|
// Send current progress to the initial sender
|
||||||
pipe(ask(counterService, GetCurrentCount, askTimeout)
|
pipe(Patterns.ask(counterService, GetCurrentCount, askTimeout)
|
||||||
.mapTo(classTag(CurrentCount.class))
|
.mapTo(classTag(CurrentCount.class))
|
||||||
.map(new Mapper<CurrentCount, Progress>() {
|
.map(new Mapper<CurrentCount, Progress>() {
|
||||||
public Progress apply(CurrentCount c) {
|
public Progress apply(CurrentCount c) {
|
||||||
|
|
@ -39,7 +39,7 @@ public class Buncher extends AbstractFSM<State, Data> {
|
||||||
// reuse this matcher
|
// reuse this matcher
|
||||||
final UnitMatch<Data> m = UnitMatch.create(
|
final UnitMatch<Data> m = UnitMatch.create(
|
||||||
matchData(Todo.class,
|
matchData(Todo.class,
|
||||||
todo -> todo.getTarget().tell(new Batch(todo.getQueue()), self())));
|
todo -> todo.getTarget().tell(new Batch(todo.getQueue()), getSelf())));
|
||||||
m.match(stateData());
|
m.match(stateData());
|
||||||
}).
|
}).
|
||||||
state(Idle, Active, () -> {/* Do something here */}));
|
state(Idle, Active, () -> {/* Do something here */}));
|
||||||
|
|
|
||||||
|
|
@ -131,10 +131,10 @@ public class FSMDocTest extends AbstractJavaTest {
|
||||||
log().warning("Failure in state " + state + " with data " + data + "\n" +
|
log().warning("Failure in state " + state + " with data " + data + "\n" +
|
||||||
"Events leading up to this point:\n\t" + lastEvents);
|
"Events leading up to this point:\n\t" + lastEvents);
|
||||||
//#logging-fsm
|
//#logging-fsm
|
||||||
target.tell(reason.cause(), self());
|
target.tell(reason.cause(), getSelf());
|
||||||
target.tell(state, self());
|
target.tell(state, getSelf());
|
||||||
target.tell(data, self());
|
target.tell(data, getSelf());
|
||||||
target.tell(lastEvents, self());
|
target.tell(lastEvents, getSelf());
|
||||||
//#logging-fsm
|
//#logging-fsm
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
@ -143,11 +143,11 @@ public class FSMDocTest extends AbstractJavaTest {
|
||||||
startWith(SomeState, Data.Foo);
|
startWith(SomeState, Data.Foo);
|
||||||
when(SomeState, matchEvent(ActorRef.class, Data.class, (ref, data) -> {
|
when(SomeState, matchEvent(ActorRef.class, Data.class, (ref, data) -> {
|
||||||
target = ref;
|
target = ref;
|
||||||
target.tell("going active", self());
|
target.tell("going active", getSelf());
|
||||||
return goTo(Active);
|
return goTo(Active);
|
||||||
}));
|
}));
|
||||||
when(Active, matchEventEquals("stop", (event, data) -> {
|
when(Active, matchEventEquals("stop", (event, data) -> {
|
||||||
target.tell("stopping", self());
|
target.tell("stopping", getSelf());
|
||||||
return stop(new Failure("This is not the error you're looking for"));
|
return stop(new Failure("This is not the error you're looking for"));
|
||||||
}));
|
}));
|
||||||
initialize();
|
initialize();
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ public class FactorialBackend extends AbstractActor {
|
||||||
CompletableFuture.supplyAsync(() -> factorial(n))
|
CompletableFuture.supplyAsync(() -> factorial(n))
|
||||||
.thenApply((factorial) -> new FactorialResult(n, factorial));
|
.thenApply((factorial) -> new FactorialResult(n, factorial));
|
||||||
|
|
||||||
pipe(result, getContext().dispatcher()).to(sender());
|
pipe(result, getContext().dispatcher()).to(getSender());
|
||||||
|
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ public class FactorialFrontend extends AbstractActor {
|
||||||
if (repeat)
|
if (repeat)
|
||||||
sendJobs();
|
sendJobs();
|
||||||
else
|
else
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.match(ReceiveTimeout.class, x -> {
|
.match(ReceiveTimeout.class, x -> {
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ public class StatsSampleClient extends AbstractActor {
|
||||||
//re-subscribe when restart
|
//re-subscribe when restart
|
||||||
@Override
|
@Override
|
||||||
public void postStop() {
|
public void postStop() {
|
||||||
cluster.unsubscribe(self());
|
cluster.unsubscribe(getSelf());
|
||||||
tickTask.cancel();
|
tickTask.cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,13 +18,13 @@ public class TransformationBackend extends AbstractActor {
|
||||||
//subscribe to cluster changes, MemberUp
|
//subscribe to cluster changes, MemberUp
|
||||||
@Override
|
@Override
|
||||||
public void preStart() {
|
public void preStart() {
|
||||||
cluster.subscribe(self(), MemberUp.class);
|
cluster.subscribe(getSelf(), MemberUp.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
//re-subscribe when restart
|
//re-subscribe when restart
|
||||||
@Override
|
@Override
|
||||||
public void postStop() {
|
public void postStop() {
|
||||||
cluster.unsubscribe(self());
|
cluster.unsubscribe(getSelf());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,8 @@ public class TransformationFrontend extends AbstractActor {
|
||||||
.forward(job, getContext());
|
.forward(job, getContext());
|
||||||
})
|
})
|
||||||
.matchEquals(BACKEND_REGISTRATION, x -> {
|
.matchEquals(BACKEND_REGISTRATION, x -> {
|
||||||
getContext().watch(sender());
|
getContext().watch(getSender());
|
||||||
backends.add(sender());
|
backends.add(getSender());
|
||||||
})
|
})
|
||||||
.match(Terminated.class, terminated -> {
|
.match(Terminated.class, terminated -> {
|
||||||
backends.remove(terminated.getActor());
|
backends.remove(terminated.getActor());
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ public class DistributedDataDocTest extends AbstractJavaTest {
|
||||||
return receiveBuilder()
|
return receiveBuilder()
|
||||||
.match(String.class, a -> a.equals("increment"), a -> {
|
.match(String.class, a -> a.equals("increment"), a -> {
|
||||||
// incoming command to increase the counter
|
// incoming command to increase the counter
|
||||||
Optional<Object> reqContext = Optional.of(sender());
|
Optional<Object> reqContext = Optional.of(getSender());
|
||||||
Replicator.Update<PNCounter> upd = new Replicator.Update<PNCounter>(counter1Key,
|
Replicator.Update<PNCounter> upd = new Replicator.Update<PNCounter>(counter1Key,
|
||||||
PNCounter.create(), writeTwo, reqContext, curr -> curr.increment(node, 1));
|
PNCounter.create(), writeTwo, reqContext, curr -> curr.increment(node, 1));
|
||||||
replicator.tell(upd, getSelf());
|
replicator.tell(upd, getSelf());
|
||||||
|
|
@ -217,7 +217,7 @@ public class DistributedDataDocTest extends AbstractJavaTest {
|
||||||
return receiveBuilder()
|
return receiveBuilder()
|
||||||
.match(String.class, a -> a.equals("get-count"), a -> {
|
.match(String.class, a -> a.equals("get-count"), a -> {
|
||||||
// incoming request to retrieve current value of the counter
|
// incoming request to retrieve current value of the counter
|
||||||
Optional<Object> reqContext = Optional.of(sender());
|
Optional<Object> reqContext = Optional.of(getSender());
|
||||||
replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
|
replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
|
||||||
readTwo), getSelf());
|
readTwo), getSelf());
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ public class JavaReadBackPressure {
|
||||||
tcp = Tcp.get(getContext().getSystem()).manager();
|
tcp = Tcp.get(getContext().getSystem()).manager();
|
||||||
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
|
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
|
||||||
tcp.tell(
|
tcp.tell(
|
||||||
TcpMessage.bind(self(), new InetSocketAddress("localhost", 0), 100, options, true),
|
TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100, options, true),
|
||||||
getSelf()
|
getSelf()
|
||||||
);
|
);
|
||||||
//#pull-mode-bind
|
//#pull-mode-bind
|
||||||
|
|
|
||||||
|
|
@ -73,7 +73,7 @@ public class JavaUdpMulticast {
|
||||||
final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
|
final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
|
||||||
// listen for datagrams on this address
|
// listen for datagrams on this address
|
||||||
InetSocketAddress endpoint = new InetSocketAddress(port);
|
InetSocketAddress endpoint = new InetSocketAddress(port);
|
||||||
mgr.tell(UdpMessage.bind(self(), endpoint, options), getSelf());
|
mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf());
|
||||||
//#bind
|
//#bind
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ public class UdpDocTest {
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder()
|
return receiveBuilder()
|
||||||
.match(Udp.SimpleSenderReady.class, message -> {
|
.match(Udp.SimpleSenderReady.class, message -> {
|
||||||
getContext().become(ready(sender()));
|
getContext().become(ready(getSender()));
|
||||||
//#sender
|
//#sender
|
||||||
getSender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), getSelf());
|
getSender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), getSelf());
|
||||||
//#sender
|
//#sender
|
||||||
|
|
@ -68,7 +68,7 @@ public class UdpDocTest {
|
||||||
// request creation of a bound listen socket
|
// request creation of a bound listen socket
|
||||||
final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
|
final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
|
||||||
mgr.tell(
|
mgr.tell(
|
||||||
UdpMessage.bind(self(), new InetSocketAddress("localhost", 0)),
|
UdpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0)),
|
||||||
getSelf());
|
getSelf());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -116,14 +116,14 @@ public class UdpDocTest {
|
||||||
|
|
||||||
// create a restricted a.k.a. “connected” socket
|
// create a restricted a.k.a. “connected” socket
|
||||||
final ActorRef mgr = UdpConnected.get(getContext().getSystem()).getManager();
|
final ActorRef mgr = UdpConnected.get(getContext().getSystem()).getManager();
|
||||||
mgr.tell(UdpConnectedMessage.connect(self(), remote), getSelf());
|
mgr.tell(UdpConnectedMessage.connect(getSelf(), remote), getSelf());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder()
|
return receiveBuilder()
|
||||||
.match(UdpConnected.Connected.class, message -> {
|
.match(UdpConnected.Connected.class, message -> {
|
||||||
getContext().become(ready(sender()));
|
getContext().become(ready(getSender()));
|
||||||
//#connected
|
//#connected
|
||||||
getSender()
|
getSender()
|
||||||
.tell(UdpConnectedMessage.send(ByteString.fromString("hello")),
|
.tell(UdpConnectedMessage.send(ByteString.fromString("hello")),
|
||||||
|
|
@ -154,7 +154,7 @@ public class UdpDocTest {
|
||||||
connection.tell(message, getSelf());
|
connection.tell(message, getSelf());
|
||||||
})
|
})
|
||||||
.match(UdpConnected.Disconnected.class, x -> {
|
.match(UdpConnected.Disconnected.class, x -> {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ public class EchoHandler extends AbstractActor {
|
||||||
.match(ConnectionClosed.class, msg -> {
|
.match(ConnectionClosed.class, msg -> {
|
||||||
if (msg.isPeerClosed()) {
|
if (msg.isPeerClosed()) {
|
||||||
if (storage.isEmpty()) {
|
if (storage.isEmpty()) {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
} else {
|
} else {
|
||||||
getContext().become(closing());
|
getContext().become(closing());
|
||||||
}
|
}
|
||||||
|
|
@ -119,7 +119,7 @@ public class EchoHandler extends AbstractActor {
|
||||||
if (msg.isPeerClosed())
|
if (msg.isPeerClosed())
|
||||||
state.peerClosed = true;
|
state.peerClosed = true;
|
||||||
else
|
else
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
|
|
||||||
})
|
})
|
||||||
.match(Integer.class, ack -> {
|
.match(Integer.class, ack -> {
|
||||||
|
|
@ -130,7 +130,7 @@ public class EchoHandler extends AbstractActor {
|
||||||
|
|
||||||
if (storage.isEmpty()) {
|
if (storage.isEmpty()) {
|
||||||
if (state.peerClosed)
|
if (state.peerClosed)
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
else
|
else
|
||||||
getContext().become(writing);
|
getContext().become(writing);
|
||||||
|
|
||||||
|
|
@ -165,7 +165,7 @@ public class EchoHandler extends AbstractActor {
|
||||||
.match(Integer.class, msg -> {
|
.match(Integer.class, msg -> {
|
||||||
acknowledge(msg);
|
acknowledge(msg);
|
||||||
if (storage.isEmpty())
|
if (storage.isEmpty())
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
@ -197,7 +197,7 @@ public class EchoHandler extends AbstractActor {
|
||||||
|
|
||||||
if (stored > MAX_STORED) {
|
if (stored > MAX_STORED) {
|
||||||
log.warning("drop connection to [{}] (buffer overrun)", remote);
|
log.warning("drop connection to [{}] (buffer overrun)", remote);
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
|
|
||||||
} else if (stored > HIGH_WATERMARK) {
|
} else if (stored > HIGH_WATERMARK) {
|
||||||
log.debug("suspending reading at {}", currentOffset());
|
log.debug("suspending reading at {}", currentOffset());
|
||||||
|
|
|
||||||
|
|
@ -40,14 +40,14 @@ public class EchoManager extends AbstractActor {
|
||||||
final ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();
|
final ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();
|
||||||
//#manager
|
//#manager
|
||||||
tcpManager.tell(
|
tcpManager.tell(
|
||||||
TcpMessage.bind(self(), new InetSocketAddress("localhost", 0), 100),
|
TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100),
|
||||||
getSelf());
|
getSelf());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postRestart(Throwable arg0) throws Exception {
|
public void postRestart(Throwable arg0) throws Exception {
|
||||||
// do not restart
|
// do not restart
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -59,7 +59,7 @@ public class EchoManager extends AbstractActor {
|
||||||
.match(Tcp.CommandFailed.class, failed -> {
|
.match(Tcp.CommandFailed.class, failed -> {
|
||||||
if (failed.cmd() instanceof Bind) {
|
if (failed.cmd() instanceof Bind) {
|
||||||
log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).localAddress());
|
log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).localAddress());
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
} else {
|
} else {
|
||||||
log.warning("unknown command failed [{}]", failed.cmd());
|
log.warning("unknown command failed [{}]", failed.cmd());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ public class IODocTest extends AbstractJavaTest {
|
||||||
@Override
|
@Override
|
||||||
public void preStart() throws Exception {
|
public void preStart() throws Exception {
|
||||||
final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
|
final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
|
||||||
tcp.tell(TcpMessage.bind(self(),
|
tcp.tell(TcpMessage.bind(getSelf(),
|
||||||
new InetSocketAddress("localhost", 0), 100), getSelf());
|
new InetSocketAddress("localhost", 0), 100), getSelf());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -60,7 +60,7 @@ public class IODocTest extends AbstractJavaTest {
|
||||||
|
|
||||||
})
|
})
|
||||||
.match(CommandFailed.class, msg -> {
|
.match(CommandFailed.class, msg -> {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
|
|
||||||
})
|
})
|
||||||
.match(Connected.class, conn -> {
|
.match(Connected.class, conn -> {
|
||||||
|
|
@ -87,7 +87,7 @@ public class IODocTest extends AbstractJavaTest {
|
||||||
getSender().tell(TcpMessage.write(data), getSelf());
|
getSender().tell(TcpMessage.write(data), getSelf());
|
||||||
})
|
})
|
||||||
.match(ConnectionClosed.class, msg -> {
|
.match(ConnectionClosed.class, msg -> {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
@ -118,13 +118,13 @@ public class IODocTest extends AbstractJavaTest {
|
||||||
return receiveBuilder()
|
return receiveBuilder()
|
||||||
.match(CommandFailed.class, msg -> {
|
.match(CommandFailed.class, msg -> {
|
||||||
listener.tell("failed", getSelf());
|
listener.tell("failed", getSelf());
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
|
|
||||||
})
|
})
|
||||||
.match(Connected.class, msg -> {
|
.match(Connected.class, msg -> {
|
||||||
listener.tell(msg, getSelf());
|
listener.tell(msg, getSelf());
|
||||||
getSender().tell(TcpMessage.register(self()), getSelf());
|
getSender().tell(TcpMessage.register(getSelf()), getSelf());
|
||||||
getContext().become(connected(sender()));
|
getContext().become(connected(getSender()));
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
@ -144,7 +144,7 @@ public class IODocTest extends AbstractJavaTest {
|
||||||
connection.tell(TcpMessage.close(), getSelf());
|
connection.tell(TcpMessage.close(), getSelf());
|
||||||
})
|
})
|
||||||
.match(ConnectionClosed.class, msg -> {
|
.match(ConnectionClosed.class, msg -> {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ public class SimpleEchoHandler extends AbstractActor {
|
||||||
|
|
||||||
})
|
})
|
||||||
.match(ConnectionClosed.class, msg -> {
|
.match(ConnectionClosed.class, msg -> {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
@ -71,7 +71,7 @@ public class SimpleEchoHandler extends AbstractActor {
|
||||||
closing = true;
|
closing = true;
|
||||||
} else {
|
} else {
|
||||||
// could also be ErrorClosed, in which case we just give up
|
// could also be ErrorClosed, in which case we just give up
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -98,7 +98,7 @@ public class SimpleEchoHandler extends AbstractActor {
|
||||||
|
|
||||||
if (stored > maxStored) {
|
if (stored > maxStored) {
|
||||||
log.warning("drop connection to [{}] (buffer overrun)", remote);
|
log.warning("drop connection to [{}] (buffer overrun)", remote);
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
|
|
||||||
} else if (stored > highWatermark) {
|
} else if (stored > highWatermark) {
|
||||||
log.debug("suspending reading");
|
log.debug("suspending reading");
|
||||||
|
|
@ -120,7 +120,7 @@ public class SimpleEchoHandler extends AbstractActor {
|
||||||
|
|
||||||
if (storage.isEmpty()) {
|
if (storage.isEmpty()) {
|
||||||
if (closing) {
|
if (closing) {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
} else {
|
} else {
|
||||||
getContext().unbecome();
|
getContext().unbecome();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ public class Watcher extends AbstractActor {
|
||||||
})
|
})
|
||||||
.match(Terminated.class, msg -> {
|
.match(Terminated.class, msg -> {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
if (latch.getCount() == 0) getContext().stop(self());
|
if (latch.getCount() == 0) getContext().stop(getSelf());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -91,7 +91,7 @@ public class ActorPublisherDocTest extends AbstractJavaTest {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.match(ActorPublisherMessage.Request.class, request -> deliverBuf())
|
.match(ActorPublisherMessage.Request.class, request -> deliverBuf())
|
||||||
.match(ActorPublisherMessage.Cancel.class, cancel -> getContext().stop(self()))
|
.match(ActorPublisherMessage.Cancel.class, cancel -> getContext().stop(getSelf()))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -179,7 +179,7 @@ public class ActorSubscriberDocTest extends AbstractJavaTest {
|
||||||
})
|
})
|
||||||
.match(ActorSubscriberMessage.onCompleteInstance().getClass(), complete -> {
|
.match(ActorSubscriberMessage.onCompleteInstance().getClass(), complete -> {
|
||||||
if (queue.isEmpty()) {
|
if (queue.isEmpty()) {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.match(WorkerPoolProtocol.Reply.class, reply -> {
|
.match(WorkerPoolProtocol.Reply.class, reply -> {
|
||||||
|
|
@ -187,7 +187,7 @@ public class ActorSubscriberDocTest extends AbstractJavaTest {
|
||||||
queue.get(id).tell(WorkerPoolProtocol.done(id), getSelf());
|
queue.get(id).tell(WorkerPoolProtocol.done(id), getSelf());
|
||||||
queue.remove(id);
|
queue.remove(id);
|
||||||
if (canceled() && queue.isEmpty()) {
|
if (canceled() && queue.isEmpty()) {
|
||||||
getContext().stop(self());
|
getContext().stop(getSelf());
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
|
||||||
|
|
@ -113,7 +113,7 @@ public class RecipeGlobalRateLimit extends RecipeTest {
|
||||||
releaseWaiting();
|
releaseWaiting();
|
||||||
})
|
})
|
||||||
.match(WantToPass.class, wtp -> {
|
.match(WantToPass.class, wtp -> {
|
||||||
waitQueue.add(sender());
|
waitQueue.add(getSender());
|
||||||
})
|
})
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -177,7 +177,7 @@ public class ParentChildTest extends AbstractJavaTest {
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder()
|
return receiveBuilder()
|
||||||
.matchAny(message -> {
|
.matchAny(message -> {
|
||||||
if (sender().equals(child)) {
|
if (getSender().equals(child)) {
|
||||||
proxy.ref().forward(message, getContext());
|
proxy.ref().forward(message, getContext());
|
||||||
} else {
|
} else {
|
||||||
child.forward(message, getContext());
|
child.forward(message, getContext());
|
||||||
|
|
|
||||||
|
|
@ -49,5 +49,5 @@ Step Description
|
||||||
Full Source Code of the Fault Tolerance Sample
|
Full Source Code of the Fault Tolerance Sample
|
||||||
------------------------------------------------------
|
------------------------------------------------------
|
||||||
|
|
||||||
.. includecode:: code/jdocs/actor/japi/FaultHandlingDocSample.java#all
|
.. includecode:: code/jdocs/actor/FaultHandlingDocSample.java#all
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue