pekko/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java

174 lines
4.6 KiB
Java
Raw Normal View History

/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.Procedure;
import akka.japi.Option;
!per persistAsync Breaks binary compatibility because adding new methods to Eventsourced trait. Since akka-persistence is experimental this is ok, yet source-level compatibility has been perserved thankfuly :-) Deprecates: * Rename of EventsourcedProcessor -> PersistentActor * Processor -> suggest using PersistentActor * Migration guide for akka-persistence is separate, as wel'll deprecate in minor versions (its experimental) * Persistent as well as ConfirmablePersistent - since Processor, their main user will be removed soon. Other changes: * persistAsync works as expected when mixed with persist * A counter must be kept for pending stashing invocations * Uses only 1 shared list buffer for persit / persistAsync * Includes small benchmark * Docs also include info about not using Persistent() wrapper * uses java LinkedList, for best performance of append / head on persistInvocations; the get(0) is safe, because these msgs only come in response to persistInvocations * Renamed internal *MessagesSuccess/Failure messages because we kept small mistakes seeing the class "with s" and "without s" as the same * Updated everything that refered to EventsourcedProcessor to PersistentActor, including samples Refs #15227 Conflicts: akka-docs/rst/project/migration-guides.rst akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala project/AkkaBuild.scala
2014-05-21 01:35:21 +02:00
import akka.persistence.UntypedPersistentActor;
// Doc code, compile only
public class ClusterShardingTest {
ActorSystem system = null;
ActorRef getSelf() {
return null;
}
public void demonstrateUsage() {
//#counter-extractor
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
@Override
public String entryId(Object message) {
if (message instanceof Counter.EntryEnvelope)
return String.valueOf(((Counter.EntryEnvelope) message).id);
else if (message instanceof Counter.Get)
return String.valueOf(((Counter.Get) message).counterId);
else
return null;
}
@Override
public Object entryMessage(Object message) {
if (message instanceof Counter.EntryEnvelope)
return ((Counter.EntryEnvelope) message).payload;
else
return message;
}
@Override
public String shardId(Object message) {
2014-10-27 10:52:34 +01:00
int numberOfShards = 100;
if (message instanceof Counter.EntryEnvelope) {
long id = ((Counter.EntryEnvelope) message).id;
2014-10-27 10:52:34 +01:00
return String.valueOf(id % numberOfShards);
} else if (message instanceof Counter.Get) {
long id = ((Counter.Get) message).counterId;
2014-10-27 10:52:34 +01:00
return String.valueOf(id % numberOfShards);
} else {
return null;
}
}
};
//#counter-extractor
//#counter-start
Option<String> roleOption = Option.none();
2014-10-27 10:52:34 +01:00
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor);
//#counter-start
//#counter-usage
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
2014-10-27 10:52:34 +01:00
counterRegion.tell(new Counter.Get(123), getSelf());
2014-10-27 10:52:34 +01:00
counterRegion.tell(new Counter.EntryEnvelope(123,
Counter.CounterOp.INCREMENT), getSelf());
2014-10-27 10:52:34 +01:00
counterRegion.tell(new Counter.Get(123), getSelf());
//#counter-usage
}
static//#counter-actor
!per persistAsync Breaks binary compatibility because adding new methods to Eventsourced trait. Since akka-persistence is experimental this is ok, yet source-level compatibility has been perserved thankfuly :-) Deprecates: * Rename of EventsourcedProcessor -> PersistentActor * Processor -> suggest using PersistentActor * Migration guide for akka-persistence is separate, as wel'll deprecate in minor versions (its experimental) * Persistent as well as ConfirmablePersistent - since Processor, their main user will be removed soon. Other changes: * persistAsync works as expected when mixed with persist * A counter must be kept for pending stashing invocations * Uses only 1 shared list buffer for persit / persistAsync * Includes small benchmark * Docs also include info about not using Persistent() wrapper * uses java LinkedList, for best performance of append / head on persistInvocations; the get(0) is safe, because these msgs only come in response to persistInvocations * Renamed internal *MessagesSuccess/Failure messages because we kept small mistakes seeing the class "with s" and "without s" as the same * Updated everything that refered to EventsourcedProcessor to PersistentActor, including samples Refs #15227 Conflicts: akka-docs/rst/project/migration-guides.rst akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala project/AkkaBuild.scala
2014-05-21 01:35:21 +02:00
public class Counter extends UntypedPersistentActor {
public static enum CounterOp {
INCREMENT, DECREMENT
}
public static class Get {
final public long counterId;
public Get(long counterId) {
this.counterId = counterId;
}
}
public static class EntryEnvelope {
final public long id;
final public Object payload;
public EntryEnvelope(long id, Object payload) {
this.id = id;
this.payload = payload;
}
}
public static class CounterChanged {
final public int delta;
public CounterChanged(int delta) {
this.delta = delta;
}
}
int count = 0;
// getSelf().path().parent().parent().name() is the type name (utf-8 URL-encoded)
// getSelf().path().name() is the entry identifier (utf-8 URL-encoded)
@Override
public String persistenceId() {
return getSelf().path().parent().parent().name() + "-" + getSelf().path().name();
}
@Override
public void preStart() throws Exception {
super.preStart();
context().setReceiveTimeout(Duration.create(120, TimeUnit.SECONDS));
}
void updateState(CounterChanged event) {
count += event.delta;
}
@Override
public void onReceiveRecover(Object msg) {
if (msg instanceof CounterChanged)
updateState((CounterChanged) msg);
else
unhandled(msg);
}
@Override
public void onReceiveCommand(Object msg) {
if (msg instanceof Get)
getSender().tell(count, getSelf());
else if (msg == CounterOp.INCREMENT)
persist(new CounterChanged(+1), new Procedure<CounterChanged>() {
public void apply(CounterChanged evt) {
updateState(evt);
}
});
else if (msg == CounterOp.DECREMENT)
persist(new CounterChanged(-1), new Procedure<CounterChanged>() {
public void apply(CounterChanged evt) {
updateState(evt);
}
});
else if (msg.equals(ReceiveTimeout.getInstance()))
getContext().parent().tell(
new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
else
unhandled(msg);
}
}
//#counter-actor
}