Merge pull request #1291 from akka/wip-3174-pipelines-∂π

implement and document Pipelines, see #3174
This commit is contained in:
Roland Kuhn 2013-04-08 13:13:23 -07:00
commit 2375972969
21 changed files with 2540 additions and 65 deletions

View file

@ -33,7 +33,8 @@ public class InitializationDocSpecJava {
// of the actor. To opt-out from stopping the children, we
// have to override preRestart()
@Override
public void preRestart(Throwable reason, Option<Object> message) {
public void preRestart(Throwable reason, Option<Object> message)
throws Exception {
// Keep the call to postStop(), but no stopping of children
postStop();
}

View file

@ -0,0 +1,16 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import akka.actor.ActorContext;
import akka.io.PipelineContext;
//#actor-context
public interface HasActorContext extends PipelineContext {
public ActorContext getContext();
}
//#actor-context

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import akka.io.PipelineContext;
public interface HasByteOrder extends PipelineContext {
public ByteOrder byteOrder();
}

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
//#frame
import java.nio.ByteOrder;
import java.util.ArrayList;
import scala.util.Either;
import akka.io.AbstractSymmetricPipePair;
import akka.io.PipePairFactory;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;
import akka.io.SymmetricPipelineStage;
import akka.util.ByteString;
import akka.util.ByteStringBuilder;
public class LengthFieldFrame extends
SymmetricPipelineStage<PipelineContext, ByteString, ByteString> {
final int maxSize;
public LengthFieldFrame(int maxSize) {
this.maxSize = maxSize;
}
@Override
public SymmetricPipePair<ByteString, ByteString> apply(final PipelineContext ctx) {
return PipePairFactory
.create(ctx, new AbstractSymmetricPipePair<ByteString, ByteString>() {
final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
ByteString buffer = null;
@Override
public Iterable<Either<ByteString, ByteString>> onCommand(
ByteString cmd) {
final int length = cmd.length() + 4;
if (length > maxSize) {
return new ArrayList<Either<ByteString, ByteString>>(0);
}
final ByteStringBuilder bb = new ByteStringBuilder();
bb.putInt(length, byteOrder);
bb.append(cmd);
return singleCommand(bb.result());
}
@Override
public Iterable<Either<ByteString, ByteString>> onEvent(
ByteString event) {
final ArrayList<Either<ByteString, ByteString>> res =
new ArrayList<Either<ByteString, ByteString>>();
ByteString current = buffer == null ? event : buffer.concat(event);
while (true) {
if (current.length() == 0) {
buffer = null;
return res;
} else if (current.length() < 4) {
buffer = current;
return res;
} else {
final int length = current.iterator().getInt(byteOrder);
if (length > maxSize)
throw new IllegalArgumentException(
"received too large frame of size " + length + " (max = "
+ maxSize + ")");
if (current.length() < length) {
buffer = current;
return res;
} else {
res.add(makeEvent(current.slice(4, length)));
current = current.drop(length);
}
}
}
}
});
}
}
//#frame

View file

@ -0,0 +1,45 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
//#message
public class Message {
static public class Person {
private final String first;
private final String last;
public Person(String first, String last) {
this.first = first;
this.last = last;
}
public String getFirst() {
return first;
}
public String getLast() {
return last;
}
}
private final Person[] persons;
private final double[] happinessCurve;
public Message(Person[] persons, double[] happinessCurve) {
this.persons = persons;
this.happinessCurve = happinessCurve;
}
public Person[] getPersons() {
return persons;
}
public double[] getHappinessCurve() {
return happinessCurve;
}
}
//#message

View file

@ -0,0 +1,118 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import java.util.Collections;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Either;
import akka.actor.ActorRef;
import akka.io.AbstractSymmetricPipePair;
import akka.io.PipePairFactory;
import akka.io.SymmetricPipePair;
import akka.io.SymmetricPipelineStage;
import akka.util.ByteIterator;
import akka.util.ByteString;
import akka.util.ByteStringBuilder;
//#format
public class MessageStage extends
SymmetricPipelineStage<HasByteOrder, Message, ByteString> {
@Override
public SymmetricPipePair<Message, ByteString> apply(final HasByteOrder context) {
return PipePairFactory
.create(context, new AbstractSymmetricPipePair<Message, ByteString>() {
final ByteOrder byteOrder = context.byteOrder();
private void putString(ByteStringBuilder builder, String str) {
final byte[] bytes = ByteString.fromString(str, "UTF-8").toArray();
builder.putInt(bytes.length, byteOrder);
builder.putBytes(bytes);
}
@Override
public Iterable<Either<Message, ByteString>> onCommand(Message cmd) {
final ByteStringBuilder builder = new ByteStringBuilder();
builder.putInt(cmd.getPersons().length, byteOrder);
for (Message.Person p : cmd.getPersons()) {
putString(builder, p.getFirst());
putString(builder, p.getLast());
}
builder.putInt(cmd.getHappinessCurve().length, byteOrder);
builder.putDoubles(cmd.getHappinessCurve(), byteOrder);
return singleCommand(builder.result());
}
//#decoding-omitted
//#decoding
private String getString(ByteIterator iter) {
final int length = iter.getInt(byteOrder);
final byte[] bytes = new byte[length];
iter.getBytes(bytes);
return ByteString.fromArray(bytes).utf8String();
}
@Override
public Iterable<Either<Message, ByteString>> onEvent(ByteString evt) {
final ByteIterator iter = evt.iterator();
final int personLength = iter.getInt(byteOrder);
final Message.Person[] persons = new Message.Person[personLength];
for (int i = 0; i < personLength; ++i) {
persons[i] = new Message.Person(getString(iter), getString(iter));
}
final int curveLength = iter.getInt(byteOrder);
final double[] curve = new double[curveLength];
iter.getDoubles(curve, byteOrder);
// verify that this was all; could be left out to allow future
// extensions
assert iter.isEmpty();
return singleEvent(new Message(persons, curve));
}
//#decoding
ActorRef target = null;
//#mgmt-ticks
private FiniteDuration lastTick = Duration.Zero();
@Override
public Iterable<Either<Message, ByteString>> onManagementCommand(Object cmd) {
//#omitted
if (cmd instanceof PipelineTest.SetTarget) {
target = ((PipelineTest.SetTarget) cmd).getRef();
} else if (cmd instanceof TickGenerator.Tick && target != null) {
target.tell(cmd, null);
}
//#omitted
if (cmd instanceof TickGenerator.Tick) {
final FiniteDuration timestamp = ((TickGenerator.Tick) cmd)
.getTimestamp();
System.out.println("time since last tick: "
+ timestamp.minus(lastTick));
lastTick = timestamp;
}
return Collections.emptyList();
}
//#mgmt-ticks
//#decoding-omitted
});
}
}
//#format

View file

@ -0,0 +1,167 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.Duration;
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActorFactory;
import akka.io.AbstractPipelineContext;
import akka.io.PipelineFactory;
import akka.io.PipelineInjector;
import akka.io.PipelineSink;
import akka.io.PipelineStage;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import akka.util.ByteString;
public class PipelineTest {
//#message
final Message msg = new Message(
new Message.Person[] {
new Message.Person("Alice", "Gibbons"),
new Message.Person("Bob", "Sparseley")
},
new double[] { 1.0, 3.0, 5.0 });
//#message
//#byteorder
class Context extends AbstractPipelineContext implements HasByteOrder {
@Override
public ByteOrder byteOrder() {
return java.nio.ByteOrder.BIG_ENDIAN;
}
}
final Context ctx = new Context();
//#byteorder
static ActorSystem system = null;
@BeforeClass
public static void setup() {
system = ActorSystem.create("PipelineTest");
}
@AfterClass
public static void teardown() {
system.shutdown();
}
@Test
public void demonstratePipeline() throws Exception {
final TestProbe probe = TestProbe.apply(system);
final ActorRef commandHandler = probe.ref();
final ActorRef eventHandler = probe.ref();
//#build-sink
final PipelineStage<Context, Message, ByteString, Message, ByteString> stages =
PipelineStage.sequence(
new MessageStage(),
new LengthFieldFrame(10000)
);
final PipelineSink<ByteString, Message> sink =
new PipelineSink<ByteString, Message>() {
@Override
public void onCommand(ByteString cmd) throws Throwable {
commandHandler.tell(cmd, null);
}
@Override
public void onEvent(Message evt) throws Throwable {
eventHandler.tell(evt, null);
}
};
final PipelineInjector<Message, ByteString> injector =
PipelineFactory.buildWithSink(ctx, stages, sink);
injector.injectCommand(msg);
//#build-sink
final ByteString encoded = probe.expectMsgClass(ByteString.class);
injector.injectEvent(encoded);
final Message decoded = probe.expectMsgClass(Message.class);
assert msg == decoded;
}
static class SetTarget {
final ActorRef ref;
public SetTarget(ActorRef ref) {
super();
this.ref = ref;
}
public ActorRef getRef() {
return ref;
}
}
@Test
public void testTick() {
new JavaTestKit(system) {
{
final ActorRef proc = system.actorOf(new Props(
new UntypedActorFactory() {
private static final long serialVersionUID = 1L;
@Override
public Actor create() throws Exception {
return new Processor(getRef(), getRef()) {
@Override
public void onReceive(Object obj) throws Exception {
if (obj.equals("fail!")) {
throw new RuntimeException("FAIL!");
}
super.onReceive(obj);
}
};
}
}), "processor");
expectMsgClass(TickGenerator.Tick.class);
proc.tell(msg, null);
final ByteString encoded = expectMsgClass(ByteString.class);
proc.tell(encoded, null);
final Message decoded = expectMsgClass(Message.class);
assert msg == decoded;
new Within(Duration.create(1500, TimeUnit.MILLISECONDS),
Duration.create(3, TimeUnit.SECONDS)) {
protected void run() {
expectMsgClass(TickGenerator.Tick.class);
expectMsgClass(TickGenerator.Tick.class);
}
};
proc.tell("fail!", null);
new Within(Duration.create(1700, TimeUnit.MILLISECONDS),
Duration.create(3, TimeUnit.SECONDS)) {
protected void run() {
expectMsgClass(TickGenerator.Tick.class);
expectMsgClass(TickGenerator.Tick.class);
proc.tell(PoisonPill.getInstance(), null);
expectNoMsg();
}
};
}
};
}
}

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.io.AbstractPipelineContext;
import akka.io.PipelineFactory;
import akka.io.PipelineInjector;
import akka.io.PipelineSink;
import akka.io.PipelineStage;
import akka.util.ByteString;
import scala.concurrent.duration.*;
//#actor
public class Processor extends UntypedActor {
private class Context extends AbstractPipelineContext
implements HasByteOrder, HasActorContext {
@Override
public ActorContext getContext() {
return Processor.this.getContext();
}
@Override
public ByteOrder byteOrder() {
return java.nio.ByteOrder.BIG_ENDIAN;
}
}
final Context ctx = new Context();
final FiniteDuration interval = Duration.apply(1, TimeUnit.SECONDS);
final PipelineStage<Context, Message, ByteString, Message, ByteString> stages =
PipelineStage.sequence(
// Java 7 can infer these types, Java 6 cannot
PipelineStage.<Context, Message, Message, ByteString, Message, Message, ByteString> sequence( //
new TickGenerator<Message, Message>(interval), //
new MessageStage()), //
new LengthFieldFrame(10000));
private final ActorRef evts;
private final ActorRef cmds;
final PipelineInjector<Message, ByteString> injector = PipelineFactory
.buildWithSink(ctx, stages, new PipelineSink<ByteString, Message>() {
@Override
public void onCommand(ByteString cmd) {
cmds.tell(cmd, getSelf());
}
@Override
public void onEvent(Message evt) {
evts.tell(evt, getSelf());
}
});
public Processor(ActorRef cmds, ActorRef evts) throws Exception {
this.cmds = cmds;
this.evts = evts;
}
//#omitted
@Override
public void preStart() throws Exception {
injector.managementCommand(new PipelineTest.SetTarget(cmds));
}
//#omitted
@Override
public void onReceive(Object obj) throws Exception {
if (obj instanceof Message) {
injector.injectCommand((Message) obj);
} else if (obj instanceof ByteString) {
injector.injectEvent((ByteString) obj);
} else if (obj instanceof TickGenerator.Trigger) {
injector.managementCommand(obj);
}
}
}
//#actor

View file

@ -0,0 +1,86 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.util.Collections;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Either;
import akka.actor.ActorSystem;
import akka.io.AbstractPipePair;
import akka.io.PipePair;
import akka.io.PipePairFactory;
import akka.io.PipelineStage;
//#tick-generator
public class TickGenerator<Cmd, Evt> extends
PipelineStage<HasActorContext, Cmd, Cmd, Evt, Evt> {
public static interface Trigger {};
public static class Tick implements Trigger {
final FiniteDuration timestamp;
public Tick(FiniteDuration timestamp) {
super();
this.timestamp = timestamp;
}
public FiniteDuration getTimestamp() {
return timestamp;
}
}
private final FiniteDuration interval;
public TickGenerator(FiniteDuration interval) {
this.interval = interval;
}
@Override
public PipePair<Cmd, Cmd, Evt, Evt> apply(final HasActorContext ctx) {
return PipePairFactory.create(ctx,
new AbstractPipePair<Cmd, Cmd, Evt, Evt>() {
private final Trigger trigger = new Trigger() {
public String toString() {
return "Tick[" + ctx.getContext().self().path() + "]";
}
};
private void schedule() {
final ActorSystem system = ctx.getContext().system();
system.scheduler().scheduleOnce(interval,
ctx.getContext().self(), trigger, system.dispatcher(), null);
}
{
schedule();
}
@Override
public Iterable<Either<Evt, Cmd>> onCommand(Cmd cmd) {
return singleCommand(cmd);
}
@Override
public Iterable<Either<Evt, Cmd>> onEvent(Evt evt) {
return singleEvent(evt);
}
@Override
public Iterable<Either<Evt, Cmd>> onManagementCommand(Object cmd) {
if (cmd == trigger) {
ctx.getContext().self().tell(new Tick(Deadline.now().time()), null);
schedule();
}
return Collections.emptyList();
}
});
}
}
//#tick-generator