2013-11-19 15:53:40 +01:00
|
|
|
/**
|
2014-02-02 19:05:45 -06:00
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
2013-11-19 15:53:40 +01:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.contrib.pattern;
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.duration.Duration;
|
|
|
|
|
import akka.actor.ActorRef;
|
|
|
|
|
import akka.actor.ActorSystem;
|
|
|
|
|
import akka.actor.PoisonPill;
|
|
|
|
|
import akka.actor.Props;
|
|
|
|
|
import akka.actor.ReceiveTimeout;
|
|
|
|
|
import akka.japi.Procedure;
|
2014-05-21 01:35:21 +02:00
|
|
|
import akka.persistence.UntypedPersistentActor;
|
2013-11-19 15:53:40 +01:00
|
|
|
|
|
|
|
|
// Doc code, compile only
|
|
|
|
|
public class ClusterShardingTest {
|
|
|
|
|
|
|
|
|
|
ActorSystem system = null;
|
|
|
|
|
|
|
|
|
|
ActorRef getSelf() {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void demonstrateUsage() {
|
|
|
|
|
//#counter-extractor
|
|
|
|
|
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String entryId(Object message) {
|
|
|
|
|
if (message instanceof Counter.EntryEnvelope)
|
|
|
|
|
return String.valueOf(((Counter.EntryEnvelope) message).id);
|
|
|
|
|
else if (message instanceof Counter.Get)
|
|
|
|
|
return String.valueOf(((Counter.Get) message).counterId);
|
|
|
|
|
else
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Object entryMessage(Object message) {
|
|
|
|
|
if (message instanceof Counter.EntryEnvelope)
|
|
|
|
|
return ((Counter.EntryEnvelope) message).payload;
|
|
|
|
|
else
|
|
|
|
|
return message;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String shardId(Object message) {
|
2014-10-27 10:52:34 +01:00
|
|
|
int numberOfShards = 100;
|
2013-11-19 15:53:40 +01:00
|
|
|
if (message instanceof Counter.EntryEnvelope) {
|
|
|
|
|
long id = ((Counter.EntryEnvelope) message).id;
|
2014-10-27 10:52:34 +01:00
|
|
|
return String.valueOf(id % numberOfShards);
|
2013-11-19 15:53:40 +01:00
|
|
|
} else if (message instanceof Counter.Get) {
|
|
|
|
|
long id = ((Counter.Get) message).counterId;
|
2014-10-27 10:52:34 +01:00
|
|
|
return String.valueOf(id % numberOfShards);
|
2013-11-19 15:53:40 +01:00
|
|
|
} else {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
};
|
|
|
|
|
//#counter-extractor
|
|
|
|
|
|
|
|
|
|
//#counter-start
|
2014-10-27 10:52:34 +01:00
|
|
|
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
|
|
|
|
|
Props.create(Counter.class), false, messageExtractor);
|
2013-11-19 15:53:40 +01:00
|
|
|
//#counter-start
|
|
|
|
|
|
|
|
|
|
//#counter-usage
|
|
|
|
|
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
|
2014-10-27 10:52:34 +01:00
|
|
|
counterRegion.tell(new Counter.Get(123), getSelf());
|
2013-11-19 15:53:40 +01:00
|
|
|
|
2014-10-27 10:52:34 +01:00
|
|
|
counterRegion.tell(new Counter.EntryEnvelope(123,
|
2013-11-19 15:53:40 +01:00
|
|
|
Counter.CounterOp.INCREMENT), getSelf());
|
2014-10-27 10:52:34 +01:00
|
|
|
counterRegion.tell(new Counter.Get(123), getSelf());
|
2013-11-19 15:53:40 +01:00
|
|
|
//#counter-usage
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static//#counter-actor
|
2014-05-21 01:35:21 +02:00
|
|
|
public class Counter extends UntypedPersistentActor {
|
2013-11-19 15:53:40 +01:00
|
|
|
|
|
|
|
|
public static enum CounterOp {
|
|
|
|
|
INCREMENT, DECREMENT
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static class Get {
|
|
|
|
|
final public long counterId;
|
|
|
|
|
|
|
|
|
|
public Get(long counterId) {
|
|
|
|
|
this.counterId = counterId;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static class EntryEnvelope {
|
|
|
|
|
final public long id;
|
|
|
|
|
final public Object payload;
|
|
|
|
|
|
|
|
|
|
public EntryEnvelope(long id, Object payload) {
|
|
|
|
|
this.id = id;
|
|
|
|
|
this.payload = payload;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static class CounterChanged {
|
|
|
|
|
final public int delta;
|
|
|
|
|
|
|
|
|
|
public CounterChanged(int delta) {
|
|
|
|
|
this.delta = delta;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int count = 0;
|
2014-07-08 17:51:18 +01:00
|
|
|
|
|
|
|
|
// getSelf().path().parent().parent().name() is the type name (utf-8 URL-encoded)
|
2014-06-26 13:56:01 +02:00
|
|
|
// getSelf().path().name() is the entry identifier (utf-8 URL-encoded)
|
|
|
|
|
@Override
|
|
|
|
|
public String persistenceId() {
|
2014-07-08 17:51:18 +01:00
|
|
|
return getSelf().path().parent().parent().name() + "-" + getSelf().path().name();
|
2014-06-26 13:56:01 +02:00
|
|
|
}
|
2013-11-19 15:53:40 +01:00
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void preStart() throws Exception {
|
|
|
|
|
super.preStart();
|
|
|
|
|
context().setReceiveTimeout(Duration.create(120, TimeUnit.SECONDS));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void updateState(CounterChanged event) {
|
|
|
|
|
count += event.delta;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
2014-01-19 17:46:32 +01:00
|
|
|
public void onReceiveRecover(Object msg) {
|
2013-11-19 15:53:40 +01:00
|
|
|
if (msg instanceof CounterChanged)
|
|
|
|
|
updateState((CounterChanged) msg);
|
|
|
|
|
else
|
|
|
|
|
unhandled(msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void onReceiveCommand(Object msg) {
|
|
|
|
|
if (msg instanceof Get)
|
|
|
|
|
getSender().tell(count, getSelf());
|
|
|
|
|
|
|
|
|
|
else if (msg == CounterOp.INCREMENT)
|
|
|
|
|
persist(new CounterChanged(+1), new Procedure<CounterChanged>() {
|
|
|
|
|
public void apply(CounterChanged evt) {
|
|
|
|
|
updateState(evt);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
else if (msg == CounterOp.DECREMENT)
|
|
|
|
|
persist(new CounterChanged(-1), new Procedure<CounterChanged>() {
|
|
|
|
|
public void apply(CounterChanged evt) {
|
|
|
|
|
updateState(evt);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
else if (msg.equals(ReceiveTimeout.getInstance()))
|
|
|
|
|
getContext().parent().tell(
|
|
|
|
|
new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
|
|
|
|
|
|
|
|
|
|
else
|
|
|
|
|
unhandled(msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#counter-actor
|
|
|
|
|
|
|
|
|
|
}
|