Java docs and sample of durable mailbox, see #2761

* Added abstract class DurableMessageQueueWithSerialization
(cherry picked from commit 49720e8cd09243ffea9f02c245e0053c126bf555)
This commit is contained in:
Patrik Nordwall 2013-02-20 12:52:28 +01:00
parent b2cd087ac6
commit dc9ac4dc57
14 changed files with 266 additions and 15 deletions

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.mailbox
import org.scalatest.junit.JUnitSuite
class DurableMailboxDocTest extends DurableMailboxDocTestBase with JUnitSuite

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.mailbox;
//#imports
import akka.actor.Props;
import akka.actor.ActorRef;
//#imports
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
public class DurableMailboxDocTestBase {
ActorSystem system;
@Before
public void setUp() {
system = ActorSystem.create("MySystem",
ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf()));
}
@After
public void tearDown() {
system.shutdown();
}
@Test
public void configDefinedDispatcher() {
//#dispatcher-config-use
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).
withDispatcher("my-dispatcher"), "myactor");
//#dispatcher-config-use
myActor.tell("test", null);
}
public static class MyUntypedActor extends UntypedActor {
public void onReceive(Object message) {
}
}
}

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.mailbox
import akka.actor.mailbox.DurableMailboxSpec
object MyDurableMailboxDocSpec {
val config = """
MyStorage-dispatcher {
mailbox-type = docs.actor.mailbox.MyDurableMailboxType
}
"""
}
class MyDurableMailboxDocSpec extends DurableMailboxSpec("MyStorage", MyDurableMailboxDocSpec.config) {
override def atStartup() {
}
override def afterTermination() {
}
"MyDurableMailbox (Java)" must {
"deliver a message" in {
val actor = createMailboxTestActor()
implicit val sender = testActor
actor ! "hello"
expectMsg("hello")
}
}
}

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.mailbox;
//#custom-mailbox-type
import scala.Option;
import com.typesafe.config.Config;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
public class MyDurableMailboxType implements MailboxType {
public MyDurableMailboxType(ActorSystem.Settings settings, Config config) {
}
@Override
public MessageQueue create(Option<ActorRef> owner,
Option<ActorSystem> system) {
if (owner.isEmpty())
throw new IllegalArgumentException("requires an owner " +
"(i.e. does not work with BalancingDispatcher)");
return new MyDurableMessageQueue(owner.get(),
(ExtendedActorSystem) system.get());
}
}
//#custom-mailbox-type

View file

@ -0,0 +1,96 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.mailbox;
//#durable-message-queue
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Callable;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ExtendedActorSystem;
import akka.actor.mailbox.DurableMessageQueueWithSerialization;
import akka.dispatch.Envelope;
import akka.dispatch.MessageQueue;
import akka.pattern.CircuitBreaker;
public class MyDurableMessageQueue extends DurableMessageQueueWithSerialization {
public MyDurableMessageQueue(ActorRef owner, ExtendedActorSystem system) {
super(owner, system);
}
private final QueueStorage storage = new QueueStorage();
// A real-world implementation would use configuration to set the last
// three parameters below
private final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), 5,
Duration.create(30, "seconds"), Duration.create(1, "minute"));
@Override
public void enqueue(ActorRef receiver, final Envelope envelope) {
breaker.callWithSyncCircuitBreaker(new Callable<Object>() {
@Override
public Object call() {
byte[] data = serialize(envelope);
storage.push(data);
return null;
}
});
}
@Override
public Envelope dequeue() {
return breaker.callWithSyncCircuitBreaker(new Callable<Envelope>() {
@Override
public Envelope call() {
byte[] data = storage.pull();
if (data == null)
return null;
else
return deserialize(data);
}
});
}
@Override
public boolean hasMessages() {
return breaker.callWithSyncCircuitBreaker(new Callable<Boolean>() {
@Override
public Boolean call() {
return !storage.isEmpty();
}
});
}
@Override
public int numberOfMessages() {
return breaker.callWithSyncCircuitBreaker(new Callable<Integer>() {
@Override
public Integer call() {
return storage.size();
}
});
}
/**
* Called when the mailbox is disposed.
* An ordinary mailbox would send remaining messages to deadLetters,
* but the purpose of a durable mailbox is to continue
* with the same message queue when the actor is started again.
*/
@Override
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {}
//#dummy-queue-storage
// dummy
private static class QueueStorage {
private final ConcurrentLinkedQueue<byte[]> queue =
new ConcurrentLinkedQueue<byte[]>();
public void push(byte[] data) { queue.offer(data); }
public byte[] pull() { return queue.poll(); }
public boolean isEmpty() { return queue.isEmpty(); }
public int size() { return queue.size(); }
}
//#dummy-queue-storage
}
//#durable-message-queue