Documentation improvements

* Re enabling Java tests in akka-docs (they were not run before)
* Fixed bug #19764
* #19735 Rewrote every sample using the deprecated PushPullStage and friends
  using GraphStage
* Pruned old unused graph images
* Added missing graffle file for new graph images
This commit is contained in:
Johan Andrén 2016-02-11 16:39:25 +01:00
parent 8f3c5aa17f
commit 737991c01c
103 changed files with 1136 additions and 4749 deletions

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 57 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

File diff suppressed because it is too large Load diff

Before

Width:  |  Height:  |  Size: 53 KiB

File diff suppressed because it is too large Load diff

Before

Width:  |  Height:  |  Size: 112 KiB

View file

@ -0,0 +1,13 @@
/*
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package docs
import org.scalatest.junit.JUnitSuite
/**
* Base class for all runnable example tests written in Java
*/
abstract class AbstractJavaTest extends JUnitSuite {
}

View file

@ -11,6 +11,8 @@ import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import org.scalatest.junit.JUnitSuite;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actor.Messages.Swap.Swap;
@ -44,7 +46,7 @@ import scala.concurrent.Future;
import static akka.pattern.Patterns.gracefulStop;
//#import-graceFulStop
public class ActorDocTest {
public class ActorDocTest extends AbstractJavaTest {
public static Config config = ConfigFactory.parseString(
"akka {\n" +
@ -323,7 +325,7 @@ public class ActorDocTest {
//#system-actorOf
// ActorSystem is a heavy object: create only one per application
final ActorSystem system = ActorSystem.create("MySystem", config);
final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor");
final ActorRef myActor = system.actorOf(Props.create(FirstActor.class), "myactor");
//#system-actorOf
try {
new JavaTestKit(system) {

View file

@ -20,8 +20,9 @@ import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import akka.testkit.AkkaSpec;
import docs.AbstractJavaTest;
public class FSMDocTest {
public class FSMDocTest extends AbstractJavaTest {
static
//#data

View file

@ -16,6 +16,7 @@ import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import docs.AbstractJavaTest;
import scala.collection.immutable.Seq;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
@ -37,7 +38,7 @@ import org.junit.BeforeClass;
import org.junit.AfterClass;
//#testkit
public class FaultHandlingTest {
public class FaultHandlingTest extends AbstractJavaTest {
//#testkit
static
//#supervisor

View file

@ -14,6 +14,7 @@ import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.scalatest.junit.JUnitSuite;
import scala.PartialFunction;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
@ -35,7 +36,7 @@ import org.junit.AfterClass;
import scala.runtime.BoxedUnit;
//#testkit
public class FaultHandlingTestJava8 {
public class FaultHandlingTestJava8 extends JUnitSuite {
//#testkit
public static Config config = ConfigFactory.parseString(

View file

@ -7,6 +7,7 @@ package docs.actor;
import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaJUnitActorSystemResource;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
@ -19,7 +20,7 @@ import akka.actor.Terminated;
import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
public class InboxDocTest {
public class InboxDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -7,11 +7,12 @@ import akka.actor.*;
import akka.japi.Procedure;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
import scala.Option;
public class InitializationDocSpecJava {
public class InitializationDocSpecJava extends AbstractJavaTest {
static public class PreStartInitExample extends UntypedActor {

View file

@ -7,6 +7,7 @@ package docs.actor;
import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -17,7 +18,7 @@ import scala.concurrent.Await;
import java.util.concurrent.TimeUnit;
public class InitializationDocTest {
public class InitializationDocTest extends AbstractJavaTest {
static ActorSystem system = null;

View file

@ -8,13 +8,14 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class SampleActorTest {
public class SampleActorTest extends AbstractJavaTest {
static ActorSystem system;

View file

@ -5,6 +5,7 @@ package docs.actor;
//#imports1
import akka.actor.Props;
import docs.AbstractJavaTest;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
//#imports1
@ -20,7 +21,7 @@ import akka.testkit.AkkaSpec;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.*;
public class SchedulerDocTest {
public class SchedulerDocTest extends AbstractJavaTest {
@Rule
public AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -10,6 +10,7 @@ import akka.actor.*;
import akka.japi.*;
import akka.dispatch.Futures;
import docs.AbstractJavaTest;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
@ -25,7 +26,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
//#imports
public class TypedActorDocTest {
public class TypedActorDocTest extends AbstractJavaTest {
Object someReference = null;
ActorSystem system = null;

View file

@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaJUnitActorSystemResource;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
@ -83,7 +84,7 @@ import akka.testkit.JavaTestKit;
import akka.util.Timeout;
//#import-ask
public class UntypedActorDocTest {
public class UntypedActorDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -8,6 +8,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -20,7 +21,7 @@ import static docs.actor.fsm.Events.SetTarget;
import static docs.actor.fsm.Events.Flush.Flush;
//#test-code
public class BuncherTest {
public class BuncherTest extends AbstractJavaTest {
static ActorSystem system;

View file

@ -6,6 +6,7 @@ package docs.actor.fsm;
import akka.actor.*;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -18,7 +19,7 @@ import static docs.actor.fsm.FSMDocTest.StateType.*;
import static docs.actor.fsm.FSMDocTest.Messages.*;
import static java.util.concurrent.TimeUnit.*;
public class FSMDocTest {
public class FSMDocTest extends AbstractJavaTest {
static ActorSystem system;
@BeforeClass

View file

@ -11,6 +11,7 @@ import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actorlambda.Messages.Swap.Swap;
@ -45,7 +46,7 @@ import scala.concurrent.Future;
import static akka.pattern.Patterns.gracefulStop;
//#import-graceFulStop
public class ActorDocTest {
public class ActorDocTest extends AbstractJavaTest {
public static Config config = ConfigFactory.parseString(
"akka {\n" +

View file

@ -14,6 +14,7 @@ import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import scala.PartialFunction;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
@ -35,7 +36,7 @@ import org.junit.AfterClass;
import scala.runtime.BoxedUnit;
//#testkit
public class FaultHandlingTest {
public class FaultHandlingTest extends AbstractJavaTest {
//#testkit
public static Config config = ConfigFactory.parseString(

View file

@ -10,6 +10,7 @@ import akka.actor.Props;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -18,7 +19,7 @@ import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class InitializationDocTest {
public class InitializationDocTest extends AbstractJavaTest {
static ActorSystem system = null;

View file

@ -8,13 +8,14 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class SampleActorTest {
public class SampleActorTest extends AbstractJavaTest {
static ActorSystem system;

View file

@ -8,6 +8,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -20,7 +21,7 @@ import static docs.actorlambda.fsm.Events.SetTarget;
import static docs.actorlambda.fsm.Events.Flush.Flush;
//#test-code
public class BuncherTest {
public class BuncherTest extends AbstractJavaTest {
static ActorSystem system;

View file

@ -6,6 +6,7 @@ package docs.actorlambda.fsm;
import akka.actor.*;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -18,7 +19,7 @@ import static docs.actorlambda.fsm.FSMDocTest.StateType.*;
import static docs.actorlambda.fsm.FSMDocTest.Messages.*;
import static java.util.concurrent.TimeUnit.*;
public class FSMDocTest {
public class FSMDocTest extends AbstractJavaTest {
static ActorSystem system;
@BeforeClass

View file

@ -9,6 +9,7 @@ package docs.camel;
import akka.testkit.JavaTestKit;
import akka.testkit.TestKit;
import akka.util.Timeout;
import docs.AbstractJavaTest;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import static java.util.concurrent.TimeUnit.SECONDS;
@ -16,7 +17,7 @@ package docs.camel;
import org.junit.Test;
public class ActivationTestBase {
public class ActivationTestBase extends AbstractJavaTest {
@SuppressWarnings("unused")
@Test

View file

@ -4,11 +4,12 @@ import akka.actor.ActorSystem;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.junit.Test;
public class CamelExtensionTest {
public class CamelExtensionTest extends AbstractJavaTest {
@Test
public void getCamelExtension() {
//#CamelExtension

View file

@ -4,6 +4,7 @@
package docs.cluster;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -13,7 +14,7 @@ import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
public class ClusterDocTest {
public class ClusterDocTest extends AbstractJavaTest {
static ActorSystem system;

View file

@ -8,7 +8,10 @@ import java.util.Arrays;
import java.util.Set;
import java.math.BigInteger;
import java.util.Optional;
import akka.actor.*;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
@ -20,9 +23,6 @@ import org.junit.BeforeClass;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.ThreadLocalRandom;
import akka.actor.Actor;
import akka.actor.ActorLogging;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.cluster.ddata.*;
import akka.japi.pf.ReceiveBuilder;
@ -33,26 +33,16 @@ import akka.testkit.AkkaSpec;
import akka.testkit.ImplicitSender;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import akka.actor.ActorRef;
import akka.serialization.SerializationExtension;
public class DistributedDataDocTest {
public class DistributedDataDocTest extends AbstractJavaTest {
static ActorSystem system;
void receive(PartialFunction<Object, BoxedUnit> pf) {
}
JavaTestKit probe = new JavaTestKit(system);
ActorRef self() {
return probe.getRef();
}
ActorRef sender() {
return probe.getRef();
}
@BeforeClass
public static void setup() {
system = ActorSystem.create("DistributedDataDocTest",
@ -67,180 +57,189 @@ public class DistributedDataDocTest {
@Test
public void demonstrateUpdate() {
probe = new JavaTestKit(system);
new JavaTestKit(system) {
{
//#update
final Cluster node = Cluster.get(system);
final ActorRef replicator = DistributedData.get(system).replicator();
//#update
final Cluster node = Cluster.get(system);
final ActorRef replicator = DistributedData.get(system).replicator();
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
final Key<GSet<String>> set1Key = GSetKey.create("set1");
final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
final Key<Flag> activeFlagKey = FlagKey.create("active");
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
final Key<GSet<String>> set1Key = GSetKey.create("set1");
final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
final Key<Flag> activeFlagKey = FlagKey.create("active");
replicator.tell(new Replicator.Update<PNCounter>(counter1Key, PNCounter.create(),
Replicator.writeLocal(), curr -> curr.increment(node, 1)), self());
replicator.tell(new Replicator.Update<PNCounter>(counter1Key, PNCounter.create(),
Replicator.writeLocal(), curr -> curr.increment(node, 1)), getTestActor());
final WriteConsistency writeTo3 = new WriteTo(3, Duration.create(1, SECONDS));
replicator.tell(new Replicator.Update<GSet<String>>(set1Key, GSet.create(),
writeTo3, curr -> curr.add("hello")), self());
final WriteConsistency writeTo3 = new WriteTo(3, Duration.create(1, SECONDS));
replicator.tell(new Replicator.Update<GSet<String>>(set1Key, GSet.create(),
writeTo3, curr -> curr.add("hello")), getTestActor());
final WriteConsistency writeMajority =
new WriteMajority(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Update<ORSet<String>>(set2Key, ORSet.create(),
writeMajority, curr -> curr.add(node, "hello")), self());
final WriteConsistency writeMajority =
new WriteMajority(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Update<ORSet<String>>(set2Key, ORSet.create(),
writeMajority, curr -> curr.add(node, "hello")), getTestActor());
final WriteConsistency writeAll = new WriteAll(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Update<Flag>(activeFlagKey, Flag.create(),
writeAll, curr -> curr.switchOn()), self());
//#update
final WriteConsistency writeAll = new WriteAll(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Update<Flag>(activeFlagKey, Flag.create(),
writeAll, curr -> curr.switchOn()), getTestActor());
//#update
probe.expectMsgClass(UpdateSuccess.class);
//#update-response1
receive(ReceiveBuilder.
match(UpdateSuccess.class, a -> a.key().equals(counter1Key), a -> {
// ok
}).build());
//#update-response1
expectMsgClass(UpdateSuccess.class);
//#update-response1
receive(ReceiveBuilder.
match(UpdateSuccess.class, a -> a.key().equals(counter1Key), a -> {
// ok
}).build());
//#update-response1
//#update-response2
receive(ReceiveBuilder.
match(UpdateSuccess.class, a -> a.key().equals(set1Key), a -> {
// ok
}).
match(UpdateTimeout.class, a -> a.key().equals(set1Key), a -> {
// write to 3 nodes failed within 1.second
}).build());
//#update-response2
//#update-response2
receive(ReceiveBuilder.
match(UpdateSuccess.class, a -> a.key().equals(set1Key), a -> {
// ok
}).
match(UpdateTimeout.class, a -> a.key().equals(set1Key), a -> {
// write to 3 nodes failed within 1.second
}).build());
//#update-response2
}};
}
@Test
public void demonstrateUpdateWithRequestContext() {
probe = new JavaTestKit(system);
new JavaTestKit(system) {
{
//#update-request-context
final Cluster node = Cluster.get(system);
final ActorRef replicator = DistributedData.get(system).replicator();
//#update-request-context
final Cluster node = Cluster.get(system);
final ActorRef replicator = DistributedData.get(system).replicator();
final WriteConsistency writeTwo = new WriteTo(2, Duration.create(3, SECONDS));
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
receive(ReceiveBuilder.
match(String.class, a -> a.equals("increment"), a -> {
// incoming command to increase the counter
Optional<Object> reqContext = Optional.of(sender());
Replicator.Update<PNCounter> upd = new Replicator.Update<PNCounter>(counter1Key,
PNCounter.create(), writeTwo, reqContext, curr -> curr.increment(node, 1));
replicator.tell(upd, self());
}).
match(UpdateSuccess.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell("ack", self());
}).
match(UpdateTimeout.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell("nack", self());
}).build());
final WriteConsistency writeTwo = new WriteTo(2, Duration.create(3, SECONDS));
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
//#update-request-context
receive(ReceiveBuilder.
match(String.class, a -> a.equals("increment"), a -> {
// incoming command to increase the counter
Optional<Object> reqContext = Optional.of(getRef());
Replicator.Update<PNCounter> upd = new Replicator.Update<PNCounter>(counter1Key,
PNCounter.create(), writeTwo, reqContext, curr -> curr.increment(node, 1));
replicator.tell(upd, getTestActor());
}).
match(UpdateSuccess.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell("ack", getTestActor());
}).
match(UpdateTimeout.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell("nack", getTestActor());
}).build());
//#update-request-context
}
};
}
@SuppressWarnings({ "unused", "unchecked" })
@Test
public void demonstrateGet() {
probe = new JavaTestKit(system);
new JavaTestKit(system) {
{
//#get
final ActorRef replicator = DistributedData.get(system).replicator();
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
final Key<GSet<String>> set1Key = GSetKey.create("set1");
final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
final Key<Flag> activeFlagKey = FlagKey.create("active");
//#get
final ActorRef replicator = DistributedData.get(system).replicator();
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
final Key<GSet<String>> set1Key = GSetKey.create("set1");
final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
final Key<Flag> activeFlagKey = FlagKey.create("active");
replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
Replicator.readLocal()), self());
replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
Replicator.readLocal()), getTestActor());
final ReadConsistency readFrom3 = new ReadFrom(3, Duration.create(1, SECONDS));
replicator.tell(new Replicator.Get<GSet<String>>(set1Key,
readFrom3), self());
final ReadConsistency readFrom3 = new ReadFrom(3, Duration.create(1, SECONDS));
replicator.tell(new Replicator.Get<GSet<String>>(set1Key,
readFrom3), getTestActor());
final ReadConsistency readMajority = new ReadMajority(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Get<ORSet<String>>(set2Key,
readMajority), self());
final ReadConsistency readMajority = new ReadMajority(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Get<ORSet<String>>(set2Key,
readMajority), getTestActor());
final ReadConsistency readAll = new ReadAll(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Get<Flag>(activeFlagKey,
readAll), self());
//#get
final ReadConsistency readAll = new ReadAll(Duration.create(5, SECONDS));
replicator.tell(new Replicator.Get<Flag>(activeFlagKey,
readAll), getTestActor());
//#get
//#get-response1
receive(ReceiveBuilder.
match(GetSuccess.class, a -> a.key().equals(counter1Key), a -> {
GetSuccess<PNCounter> g = a;
BigInteger value = g.dataValue().getValue();
}).
match(NotFound.class, a -> a.key().equals(counter1Key), a -> {
// key counter1 does not exist
}).build());
//#get-response1
//#get-response1
receive(ReceiveBuilder.
match(GetSuccess.class, a -> a.key().equals(counter1Key), a -> {
GetSuccess<PNCounter> g = a;
BigInteger value = g.dataValue().getValue();
}).
match(NotFound.class, a -> a.key().equals(counter1Key), a -> {
// key counter1 does not exist
}).build());
//#get-response1
//#get-response2
receive(ReceiveBuilder.
match(GetSuccess.class, a -> a.key().equals(set1Key), a -> {
GetSuccess<GSet<String>> g = a;
Set<String> value = g.dataValue().getElements();
}).
match(GetFailure.class, a -> a.key().equals(set1Key), a -> {
// read from 3 nodes failed within 1.second
}).
match(NotFound.class, a -> a.key().equals(set1Key), a -> {
// key set1 does not exist
}).build());
//#get-response2
//#get-response2
receive(ReceiveBuilder.
match(GetSuccess.class, a -> a.key().equals(set1Key), a -> {
GetSuccess<GSet<String>> g = a;
Set<String> value = g.dataValue().getElements();
}).
match(GetFailure.class, a -> a.key().equals(set1Key), a -> {
// read from 3 nodes failed within 1.second
}).
match(NotFound.class, a -> a.key().equals(set1Key), a -> {
// key set1 does not exist
}).build());
//#get-response2
}
};
}
@SuppressWarnings("unchecked")
@Test
public void demonstrateGetWithRequestContext() {
probe = new JavaTestKit(system);
new JavaTestKit(system) {
{
//#get-request-context
final ActorRef replicator = DistributedData.get(system).replicator();
final ReadConsistency readTwo = new ReadFrom(2, Duration.create(3, SECONDS));
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
//#get-request-context
final ActorRef replicator = DistributedData.get(system).replicator();
final ReadConsistency readTwo = new ReadFrom(2, Duration.create(3, SECONDS));
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
receive(ReceiveBuilder.
match(String.class, a -> a.equals("get-count"), a -> {
// incoming request to retrieve current value of the counter
Optional<Object> reqContext = Optional.of(sender());
replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
readTwo), self());
}).
match(GetSuccess.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
GetSuccess<PNCounter> g = a;
long value = g.dataValue().getValue().longValue();
replyTo.tell(value, self());
}).
match(GetFailure.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell(-1L, self());
}).
match(NotFound.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell(0L, self());
}).build());
//#get-request-context
receive(ReceiveBuilder.
match(String.class, a -> a.equals("get-count"), a -> {
// incoming request to retrieve current value of the counter
Optional<Object> reqContext = Optional.of(getTestActor());
replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
readTwo), getTestActor());
}).
match(GetSuccess.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
GetSuccess<PNCounter> g = a;
long value = g.dataValue().getValue().longValue();
replyTo.tell(value, getTestActor());
}).
match(GetFailure.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell(-1L, getTestActor());
}).
match(NotFound.class, a -> a.key().equals(counter1Key), a -> {
ActorRef replyTo = (ActorRef) a.getRequest().get();
replyTo.tell(0L, getTestActor());
}).build());
//#get-request-context
}
};
}
@SuppressWarnings("unchecked")
abstract class MyActor {
abstract class MyActor extends AbstractActor {
//#subscribe
final ActorRef replicator = DistributedData.get(system).replicator();
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
@ -270,21 +269,22 @@ public class DistributedDataDocTest {
@Test
public void demonstrateDelete() {
probe = new JavaTestKit(system);
new JavaTestKit(system) {
{
//#delete
final ActorRef replicator = DistributedData.get(system).replicator();
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
//#delete
final ActorRef replicator = DistributedData.get(system).replicator();
final Key<PNCounter> counter1Key = PNCounterKey.create("counter1");
final Key<ORSet<String>> set2Key = ORSetKey.create("set2");
replicator.tell(new Delete<PNCounter>(counter1Key,
Replicator.writeLocal()), getTestActor());
replicator.tell(new Delete<PNCounter>(counter1Key,
Replicator.writeLocal()), self());
final WriteConsistency writeMajority =
new WriteMajority(Duration.create(5, SECONDS));
replicator.tell(new Delete<PNCounter>(counter1Key,
writeMajority), self());
//#delete
final WriteConsistency writeMajority =
new WriteMajority(Duration.create(5, SECONDS));
replicator.tell(new Delete<PNCounter>(counter1Key,
writeMajority), getTestActor());
//#delete
}};
}
public void demonstratePNCounter() {

View file

@ -7,6 +7,7 @@ import akka.dispatch.ControlMessage;
import akka.dispatch.RequiresMessageQueue;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import docs.actor.MyBoundedUntypedActor;
import docs.actor.MyUntypedActor;
import org.junit.ClassRule;
@ -36,7 +37,7 @@ import com.typesafe.config.Config;
//#imports-required-mailbox
public class DispatcherDocTest {
public class DispatcherDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -7,6 +7,7 @@ import akka.event.japi.EventBus;
import java.util.concurrent.TimeUnit;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
@ -43,7 +44,7 @@ import akka.event.japi.ManagedActorEventBus;
//#actor-bus
public class EventBusDocTest {
public class EventBusDocTest extends AbstractJavaTest {
public static class Event {}
public static class Subscriber {}

View file

@ -19,6 +19,7 @@ import akka.event.Logging.Debug;
//#imports-listener
import docs.AbstractJavaTest;
import org.junit.Test;
import akka.testkit.JavaTestKit;
import scala.Option;
@ -35,7 +36,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
//#imports-deadletter
public class LoggingDocTest {
public class LoggingDocTest extends AbstractJavaTest {
@Test
public void useLoggingActor() {
@ -79,7 +80,7 @@ public class LoggingDocTest {
}
}
class Listener extends UntypedActor {
static class Listener extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Jazz) {

View file

@ -9,9 +9,10 @@ import java.util.concurrent.atomic.AtomicLong;
//#imports
import docs.AbstractJavaTest;
import org.junit.Test;
public class ExtensionDocTest {
public class ExtensionDocTest extends AbstractJavaTest {
static
//#extension

View file

@ -9,6 +9,7 @@ import akka.actor.AbstractExtensionId;
import akka.actor.ExtensionIdProvider;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import docs.AbstractJavaTest;
import scala.concurrent.duration.Duration;
import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit;
@ -18,7 +19,7 @@ import java.util.concurrent.TimeUnit;
import akka.actor.UntypedActor;
import org.junit.Test;
public class SettingsExtensionDocTest {
public class SettingsExtensionDocTest extends AbstractJavaTest {
static
//#extension

View file

@ -5,6 +5,7 @@ package docs.future;
//#imports1
import akka.dispatch.*;
import docs.AbstractJavaTest;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
@ -66,7 +67,7 @@ import akka.pattern.Patterns;
import static org.junit.Assert.*;
public class FutureDocTest {
public class FutureDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -6,8 +6,6 @@ package docs.http.javadsl.server;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.dispatch.OnFailure;
import akka.http.impl.util.Util;
import akka.http.javadsl.Http;
import akka.http.javadsl.IncomingConnection;
import akka.http.javadsl.ServerBinding;
@ -19,16 +17,11 @@ import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.Uri;
import akka.http.scaladsl.model.HttpEntity;
import akka.japi.function.Function;
import akka.japi.function.Procedure;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.stage.Context;
import akka.stream.stage.PushStage;
import akka.stream.stage.SyncDirective;
import akka.stream.stage.TerminationDirective;
import akka.util.ByteString;
import java.io.BufferedReader;
@ -88,19 +81,14 @@ public class HttpServerExampleDocTest {
Http.get(system).bind("localhost", 8080, materializer);
Flow<IncomingConnection, IncomingConnection, NotUsed> failureDetection =
Flow.of(IncomingConnection.class).transform(() ->
new PushStage<IncomingConnection, IncomingConnection>() {
@Override
public SyncDirective onPush(IncomingConnection elem, Context<IncomingConnection> ctx) {
return ctx.push(elem);
}
@Override
public TerminationDirective onUpstreamFailure(Throwable cause, Context<IncomingConnection> ctx) {
Flow.of(IncomingConnection.class).watchTermination((notUsed, termination) -> {
termination.whenComplete((done, cause) -> {
if (cause != null) {
// signal the failure to external monitoring service!
return super.onUpstreamFailure(cause, ctx);
}
}
});
return NotUsed.getInstance();
});
CompletionStage<ServerBinding> serverBindingFuture =
serverSource
@ -123,18 +111,14 @@ public class HttpServerExampleDocTest {
Http.get(system).bind("localhost", 8080, materializer);
Flow<HttpRequest, HttpRequest, NotUsed> failureDetection =
Flow.of(HttpRequest.class).transform(() ->
new PushStage<HttpRequest, HttpRequest>() {
@Override
public SyncDirective onPush(HttpRequest elem, Context<HttpRequest> ctx) {
return ctx.push(elem);
}
@Override
public TerminationDirective onUpstreamFailure(Throwable cause, Context<HttpRequest> ctx) {
// signal the failure to external monitoring service!
return super.onUpstreamFailure(cause, ctx);
}
Flow.of(HttpRequest.class)
.watchTermination((notUsed, termination) -> {
termination.whenComplete((done, cause) -> {
if (cause != null) {
// signal the failure to external monitoring service!
}
});
return NotUsed.getInstance();
});
Flow<HttpRequest, HttpResponse, NotUsed> httpEcho =

View file

@ -9,6 +9,7 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.io.Udp;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -19,6 +20,9 @@ import java.net.NetworkInterface;
import java.util.Enumeration;
import java.util.Random;
// not part of the test suite because we have not figured out
// a way to find an interface that is sure to work on all platforms
// to listen for udp on
public class JavaUdpMulticastTest {
static ActorSystem system;

View file

@ -6,6 +6,7 @@ package docs.io.japi;
import akka.testkit.AkkaJUnitActorSystemResource;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
@ -29,7 +30,7 @@ import akka.util.ByteString;
import akka.testkit.JavaTestKit;
import akka.testkit.AkkaSpec;
public class IODocTest {
public class IODocTest extends AbstractJavaTest {
static
//#server

View file

@ -5,6 +5,7 @@ package docs.jrouting;
import akka.testkit.AkkaJUnitActorSystemResource;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
@ -30,7 +31,7 @@ import akka.routing.ConsistentHashingRouter.ConsistentHashMapper;
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
//#imports2
public class ConsistentHashingRouterDocTest {
public class ConsistentHashingRouterDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -10,6 +10,7 @@ import akka.routing.RoutingLogic;
import akka.routing.SeveralRoutees;
import akka.testkit.AkkaJUnitActorSystemResource;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
@ -36,7 +37,7 @@ import java.util.List;
//#imports1
public class CustomRouterDocTest {
public class CustomRouterDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -5,6 +5,7 @@ package docs.jrouting;
import akka.testkit.AkkaJUnitActorSystemResource;
import docs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
@ -66,7 +67,7 @@ import akka.routing.TailChoppingPool;
//#imports2
public class RouterDocTest {
public class RouterDocTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -8,13 +8,14 @@ import akka.actor.*;
import akka.testkit.*;
import akka.testkit.TestEvent.Mute;
import akka.testkit.TestEvent.UnMute;
import docs.AbstractJavaTest;
import org.junit.*;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class SchedulerPatternTest {
public class SchedulerPatternTest extends AbstractJavaTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =

View file

@ -14,6 +14,7 @@ import akka.stream.actor.ActorPublisherMessage;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -21,100 +22,100 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class ActorPublisherDocTest {
public class ActorPublisherDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("ActorPublisherDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
//#job-manager
public static class JobManagerProtocol {
final public static class Job {
public final String payload;
public Job(String payload) {
this.payload = payload;
}
//#job-manager
public static class JobManagerProtocol {
final public static class Job {
public final String payload;
public Job(String payload) {
this.payload = payload;
}
public static class JobAcceptedMessage {
@Override
public String toString() {
return "JobAccepted";
}
}
public static final JobAcceptedMessage JobAccepted = new JobAcceptedMessage();
public static class JobDeniedMessage {
@Override
public String toString() {
return "JobDenied";
}
}
public static final JobDeniedMessage JobDenied = new JobDeniedMessage();
}
public static class JobManager extends AbstractActorPublisher<JobManagerProtocol.Job> {
public static Props props() { return Props.create(JobManager.class); }
private final int MAX_BUFFER_SIZE = 100;
private final List<JobManagerProtocol.Job> buf = new ArrayList<>();
public JobManager() {
receive(ReceiveBuilder.
match(JobManagerProtocol.Job.class, job -> buf.size() == MAX_BUFFER_SIZE, job -> {
sender().tell(JobManagerProtocol.JobDenied, self());
}).
match(JobManagerProtocol.Job.class, job -> {
sender().tell(JobManagerProtocol.JobAccepted, self());
if (buf.isEmpty() && totalDemand() > 0)
onNext(job);
else {
buf.add(job);
deliverBuf();
}
}).
match(ActorPublisherMessage.Request.class, request -> deliverBuf()).
match(ActorPublisherMessage.Cancel.class, cancel -> context().stop(self())).
build());
public static class JobAcceptedMessage {
@Override
public String toString() {
return "JobAccepted";
}
}
public static final JobAcceptedMessage JobAccepted = new JobAcceptedMessage();
void deliverBuf() {
while (totalDemand() > 0) {
/*
* totalDemand is a Long and could be larger than
* what buf.splitAt can accept
*/
if (totalDemand() <= Integer.MAX_VALUE) {
final List<JobManagerProtocol.Job> took =
buf.subList(0, Math.min(buf.size(), (int) totalDemand()));
took.forEach(this::onNext);
buf.removeAll(took);
break;
} else {
final List<JobManagerProtocol.Job> took =
buf.subList(0, Math.min(buf.size(), Integer.MAX_VALUE));
took.forEach(this::onNext);
buf.removeAll(took);
public static class JobDeniedMessage {
@Override
public String toString() {
return "JobDenied";
}
}
public static final JobDeniedMessage JobDenied = new JobDeniedMessage();
}
public static class JobManager extends AbstractActorPublisher<JobManagerProtocol.Job> {
public static Props props() { return Props.create(JobManager.class); }
private final int MAX_BUFFER_SIZE = 100;
private final List<JobManagerProtocol.Job> buf = new ArrayList<>();
public JobManager() {
receive(ReceiveBuilder.
match(JobManagerProtocol.Job.class, job -> buf.size() == MAX_BUFFER_SIZE, job -> {
sender().tell(JobManagerProtocol.JobDenied, self());
}).
match(JobManagerProtocol.Job.class, job -> {
sender().tell(JobManagerProtocol.JobAccepted, self());
if (buf.isEmpty() && totalDemand() > 0)
onNext(job);
else {
buf.add(job);
deliverBuf();
}
}).
match(ActorPublisherMessage.Request.class, request -> deliverBuf()).
match(ActorPublisherMessage.Cancel.class, cancel -> context().stop(self())).
build());
}
void deliverBuf() {
while (totalDemand() > 0) {
/*
* totalDemand is a Long and could be larger than
* what buf.splitAt can accept
*/
if (totalDemand() <= Integer.MAX_VALUE) {
final List<JobManagerProtocol.Job> took =
buf.subList(0, Math.min(buf.size(), (int) totalDemand()));
took.forEach(this::onNext);
buf.removeAll(took);
break;
} else {
final List<JobManagerProtocol.Job> took =
buf.subList(0, Math.min(buf.size(), Integer.MAX_VALUE));
took.forEach(this::onNext);
buf.removeAll(took);
}
}
}
//#job-manager
}
//#job-manager
@Test
public void demonstrateActorPublisherUsage() {

View file

@ -22,6 +22,7 @@ import akka.stream.actor.RequestStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.JavaTestKit;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -30,24 +31,24 @@ import java.util.*;
import static org.junit.Assert.assertEquals;
public class ActorSubscriberDocTest {
public class ActorSubscriberDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("ActorSubscriberDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
//#worker-pool
public static class WorkerPoolProtocol {

View file

@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import akka.stream.javadsl.GraphDSL;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -30,23 +31,24 @@ import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertArrayEquals;
public class BidiFlowDocTest {
public class BidiFlowDocTest extends AbstractJavaTest {
private static ActorSystem system;
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowDocTest");
system = ActorSystem.create("FlowDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
//#codec
static interface Message {}
static class Ping implements Message {
@ -132,58 +134,77 @@ public class BidiFlowDocTest {
.append(bytes)
.result();
}
public static class FrameParser extends PushPullStage<ByteString, ByteString> {
// this holds the received but not yet parsed bytes
private ByteString stash = ByteString.empty();
// this holds the current message length or -1 if at a boundary
private int needed = -1;
public static class FrameParser extends GraphStage<FlowShape<ByteString, ByteString>> {
public Inlet<ByteString> in = Inlet.create("FrameParser.in");
public Outlet<ByteString> out = Outlet.create("FrameParser.out");
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
@Override
public SyncDirective onPull(Context<ByteString> ctx) {
return run(ctx);
public FlowShape<ByteString, ByteString> shape() {
return shape;
}
@Override
public SyncDirective onPush(ByteString bytes, Context<ByteString> ctx) {
stash = stash.concat(bytes);
return run(ctx);
}
@Override
public TerminationDirective onUpstreamFinish(Context<ByteString> ctx) {
if (stash.isEmpty()) return ctx.finish();
else return ctx.absorbTermination(); // we still have bytes to emit
}
private SyncDirective run(Context<ByteString> ctx) {
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.size() < 4) return pullOrFinish(ctx);
else {
needed = stash.iterator().getInt(ByteOrder.LITTLE_ENDIAN);
stash = stash.drop(4);
return run(ctx); // cycle back to possibly already emit the next chunk
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
// this holds the received but not yet parsed bytes
private ByteString stash = ByteString.empty();
// this holds the current message length or -1 if at a boundary
private int needed = -1;
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
ByteString bytes = grab(in);
stash = stash.concat(bytes);
run();
}
@Override
public void onUpstreamFinish() throws Exception {
if (stash.isEmpty()) completeStage();
// wait with completion and let run() complete when the
// rest of the stash has been sent downstream
}
});
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
if (isClosed(in)) run();
else pull(in);
}
});
}
} else if (stash.size() < needed) {
// we are in the middle of a message, need more bytes
return pullOrFinish(ctx);
} else {
// we have enough to emit at least one message, so do it
final ByteString emit = stash.take(needed);
stash = stash.drop(needed);
needed = -1;
return ctx.push(emit);
}
}
/*
* After having called absorbTermination() we cannot pull any more, so if we need
* more data we will just have to give up.
*/
private SyncDirective pullOrFinish(Context<ByteString> ctx) {
if (ctx.isFinishing()) return ctx.finish();
else return ctx.pull();
private void run() {
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.size() < 4) {
if (isClosed(in)) completeStage();
else pull(in);
} else {
needed = stash.iterator().getInt(ByteOrder.LITTLE_ENDIAN);
stash = stash.drop(4);
run(); // cycle back to possibly already emit the next chunk
}
} else if (stash.size() < needed) {
// we are in the middle of a message, need more bytes
// or in is already closed and we cannot pull any more
if (isClosed(in)) completeStage();
else pull(in);
} else {
// we have enough to emit at least one message, so do it
final ByteString emit = stash.take(needed);
stash = stash.drop(needed);
needed = -1;
push(out, emit);
}
}
};
}
}
@ -192,7 +213,7 @@ public class BidiFlowDocTest {
final FlowShape<ByteString, ByteString> top =
b.add(Flow.of(ByteString.class).map(BidiFlowDocTest::addLengthHeader));
final FlowShape<ByteString, ByteString> bottom =
b.add(Flow.of(ByteString.class).transform(() -> new FrameParser()));
b.add(Flow.of(ByteString.class).via(new FrameParser()));
return BidiShape.fromFlows(top, bottom);
}));
//#framing

View file

@ -10,6 +10,7 @@ import java.util.concurrent.CompletionStage;
import akka.NotUsed;
import akka.stream.ClosedShape;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -26,23 +27,24 @@ import scala.concurrent.*;
import scala.Option;
public class CompositionDocTest {
public class CompositionDocTest extends AbstractJavaTest {
private static ActorSystem system;
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowDocTest");
system = ActorSystem.create("CompositionDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void nonNestedFlow() throws Exception {
//#non-nested-flow

View file

@ -13,6 +13,7 @@ import java.util.stream.Stream;
import akka.NotUsed;
import akka.japi.Pair;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -30,22 +31,23 @@ import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
public class FlowDocTest {
public class FlowDocTest extends AbstractJavaTest {
private static ActorSystem system;
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowDocTest");
}
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
final Materializer mat = ActorMaterializer.create(system);
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
@Test
public void sourceIsImmutable() throws Exception {

View file

@ -7,9 +7,11 @@ import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -29,7 +31,7 @@ import akka.stream.javadsl.Source;
import akka.japi.function.Function;
import akka.testkit.JavaTestKit;
public class FlowErrorDocTest {
public class FlowErrorDocTest extends AbstractJavaTest {
private static ActorSystem system;
@ -44,7 +46,7 @@ public class FlowErrorDocTest {
system = null;
}
@Test(expected = ArithmeticException.class)
@Test(expected = ExecutionException.class)
public void demonstrateFailStream() throws Exception {
//#stop
final Materializer mat = ActorMaterializer.create(system);

View file

@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import akka.stream.ClosedShape;
import akka.stream.SourceShape;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -26,23 +27,24 @@ import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
public class FlowGraphDocTest {
public class FlowGraphDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowGraphDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void demonstrateBuildSimpleGraph() throws Exception {
//#simple-flow-graph

View file

@ -7,6 +7,7 @@ package docs.stream;
import static org.junit.Assert.assertEquals;
import akka.NotUsed;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -15,23 +16,24 @@ import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
public class FlowParallelismDocTest {
public class FlowParallelismDocTest extends AbstractJavaTest {
private static ActorSystem system;
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowDocTest");
system = ActorSystem.create("FlowParallellismDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
static class ScoopOfBatter {}
static class HalfCookedPancake {}
static class Pancake {}

View file

@ -1,257 +0,0 @@
/**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.stream.stage.*;
import akka.stream.testkit.*;
import akka.stream.testkit.javadsl.*;
import akka.testkit.JavaTestKit;
public class FlowStagesDocTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowStagesDocTest");
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
final Materializer mat = ActorMaterializer.create(system);
static //#one-to-one
public class Map<A, B> extends PushPullStage<A, B> {
private final Function<A, B> f;
public Map(Function<A, B> f) {
this.f = f;
}
@Override public SyncDirective onPush(A elem, Context<B> ctx) {
return ctx.push(f.apply(elem));
}
@Override public SyncDirective onPull(Context<B> ctx) {
return ctx.pull();
}
}
//#one-to-one
static //#many-to-one
public class Filter<A> extends PushPullStage<A, A> {
private final Predicate<A> p;
public Filter(Predicate<A> p) {
this.p = p;
}
@Override public SyncDirective onPush(A elem, Context<A> ctx) {
if (p.test(elem)) return ctx.push(elem);
else return ctx.pull();
}
@Override public SyncDirective onPull(Context<A> ctx) {
return ctx.pull();
}
}
//#many-to-one
//#one-to-many
class Duplicator<A> extends PushPullStage<A, A> {
private A lastElem = null;
private boolean oneLeft = false;
@Override public SyncDirective onPush(A elem, Context<A> ctx) {
lastElem = elem;
oneLeft = true;
return ctx.push(elem);
}
@Override public SyncDirective onPull(Context<A> ctx) {
if (!ctx.isFinishing()) {
// the main pulling logic is below as it is demonstrated on the illustration
if (oneLeft) {
oneLeft = false;
return ctx.push(lastElem);
} else
return ctx.pull();
} else {
// If we need to emit a final element after the upstream
// finished
if (oneLeft) return ctx.pushAndFinish(lastElem);
else return ctx.finish();
}
}
@Override public TerminationDirective onUpstreamFinish(Context<A> ctx) {
return ctx.absorbTermination();
}
}
//#one-to-many
static//#pushstage
public class Map2<A, B> extends PushStage<A, B> {
private final Function<A, B> f;
public Map2(Function<A, B> f) {
this.f = f;
}
@Override public SyncDirective onPush(A elem, Context<B> ctx) {
return ctx.push(f.apply(elem));
}
}
public class Filter2<A> extends PushStage<A, A> {
private final Predicate<A> p;
public Filter2(Predicate<A> p) {
this.p = p;
}
@Override public SyncDirective onPush(A elem, Context<A> ctx) {
if (p.test(elem)) return ctx.push(elem);
else return ctx.pull();
}
}
//#pushstage
static //#doubler-stateful
public class Duplicator2<A> extends StatefulStage<A, A> {
@Override public StageState<A, A> initial() {
return new StageState<A, A>() {
@Override public SyncDirective onPush(A elem, Context<A> ctx) {
return emit(Arrays.asList(elem, elem).iterator(), ctx);
}
};
}
}
//#doubler-stateful
@Test
public void demonstrateVariousPushPullStages() throws Exception {
final Sink<Integer, CompletionStage<List<Integer>>> sink =
Flow.of(Integer.class).limit(10).toMat(Sink.seq(), Keep.right());
//#stage-chain
final RunnableGraph<CompletionStage<List<Integer>>> runnable =
Source
.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.transform(() -> new Filter<Integer>(elem -> elem % 2 == 0))
.transform(() -> new Duplicator<Integer>())
.transform(() -> new Map<Integer, Integer>(elem -> elem / 2))
.toMat(sink, Keep.right());
//#stage-chain
assertEquals(Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5),
runnable.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS));
}
//#detached
class Buffer2<T> extends DetachedStage<T, T> {
final private Integer SIZE = 2;
final private List<T> buf = new ArrayList<>(SIZE);
private Integer capacity = SIZE;
private boolean isFull() {
return capacity == 0;
}
private boolean isEmpty() {
return capacity == SIZE;
}
private T dequeue() {
capacity += 1;
return buf.remove(0);
}
private void enqueue(T elem) {
capacity -= 1;
buf.add(elem);
}
public DownstreamDirective onPull(DetachedContext<T> ctx) {
if (isEmpty()) {
if (ctx.isFinishing()) return ctx.finish(); // No more elements will arrive
else return ctx.holdDownstream(); // waiting until new elements
} else {
final T next = dequeue();
if (ctx.isHoldingUpstream()) return ctx.pushAndPull(next); // release upstream
else return ctx.push(next);
}
}
public UpstreamDirective onPush(T elem, DetachedContext<T> ctx) {
enqueue(elem);
if (isFull()) return ctx.holdUpstream(); // Queue is now full, wait until new empty slot
else {
if (ctx.isHoldingDownstream()) return ctx.pushAndPull(dequeue()); // Release downstream
else return ctx.pull();
}
}
public TerminationDirective onUpstreamFinish(DetachedContext<T> ctx) {
if (!isEmpty()) return ctx.absorbTermination(); // still need to flush from buffer
else return ctx.finish(); // already empty, finishing
}
}
//#detached
@Test
public void demonstrateDetachedStage() throws Exception {
final Pair<TestPublisher.Probe<Integer>,TestSubscriber.Probe<Integer>> pair =
TestSource.<Integer>probe(system)
.transform(() -> new Buffer2<Integer>())
.toMat(TestSink.probe(system), Keep.both())
.run(mat);
final TestPublisher.Probe<Integer> pub = pair.first();
final TestSubscriber.Probe<Integer> sub = pair.second();
final FiniteDuration timeout = Duration.create(100, TimeUnit.MILLISECONDS);
sub.request(2);
sub.expectNoMsg(timeout);
pub.sendNext(1);
pub.sendNext(2);
sub.expectNext(1, 2);
pub.sendNext(3);
pub.sendNext(4);
sub.expectNoMsg(timeout);
sub.request(2);
sub.expectNext(3, 4);
pub.sendComplete();
sub.expectComplete();
}
}

View file

@ -3,6 +3,7 @@ package docs.stream;
import java.util.Arrays;
import akka.NotUsed;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -14,24 +15,24 @@ import akka.stream.scaladsl.MergePreferred.MergePreferredShape;
import akka.testkit.JavaTestKit;
public class GraphCyclesDocTest {
public class GraphCyclesDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("GraphCyclesDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
final static SilenceSystemOut.System System = SilenceSystemOut.get();
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3, 4, 5));

View file

@ -18,6 +18,7 @@ import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.testkit.JavaTestKit;
import akka.japi.Function;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -37,22 +38,23 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class GraphStageDocTest {
public class GraphStageDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FlowGraphDocTest");
system = ActorSystem.create("GraphStageDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
//#simple-source
public class NumbersSource extends GraphStage<SourceShape<Integer>> {

View file

@ -14,6 +14,7 @@ import akka.testkit.TestProbe;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import docs.stream.TwitterStreamQuickstartDocTest.Model.Author;
import docs.stream.TwitterStreamQuickstartDocTest.Model.Tweet;
import org.junit.AfterClass;
@ -32,11 +33,12 @@ import static docs.stream.TwitterStreamQuickstartDocTest.Model.AKKA;
import static docs.stream.TwitterStreamQuickstartDocTest.Model.tweets;
import static junit.framework.TestCase.assertTrue;
public class IntegrationDocTest {
public class IntegrationDocTest extends AbstractJavaTest {
private static final SilenceSystemOut.System System = SilenceSystemOut.get();
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
@ -51,15 +53,16 @@ public class IntegrationDocTest {
"akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox\n");
system = ActorSystem.create("ActorPublisherDocTest", config);
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
class AddressSystem {
//#email-address-lookup

View file

@ -14,6 +14,7 @@ import java.util.stream.DoubleStream;
import java.util.stream.Stream;
import akka.NotUsed;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -36,23 +37,24 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.util.Random;
public class RateTransformationDocTest {
public class RateTransformationDocTest extends AbstractJavaTest {
private static ActorSystem system;
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RateTransformationDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
final Random r = new Random();
@Test

View file

@ -13,10 +13,12 @@ import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import docs.AbstractJavaTest;
import docs.stream.TwitterStreamQuickstartDocTest.Model.Author;
import docs.stream.TwitterStreamQuickstartDocTest.Model.Tweet;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
//#imports
@ -32,24 +34,30 @@ import java.lang.Exception;
import static docs.stream.ReactiveStreamsDocTest.Fixture.Data.authors;
import static docs.stream.TwitterStreamQuickstartDocTest.Model.AKKA;
public class ReactiveStreamsDocTest {
public class ReactiveStreamsDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
static TestProbe storageProbe;
static TestProbe alertProbe;
@BeforeClass
public static void setup() {
system = ActorSystem.create("ReactiveStreamsDocTest");
mat = ActorMaterializer.create(system);
storageProbe = new TestProbe(system);
alertProbe = new TestProbe(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
storageProbe = null;
alertProbe = null;
}
final Materializer mat = ActorMaterializer.create(system);
static class Fixture {
// below class additionally helps with aligning code includes nicely
static class Data {
@ -77,9 +85,6 @@ public class ReactiveStreamsDocTest {
}
}
final TestProbe storageProbe = TestProbe.apply(system);
final TestProbe alertProbe = TestProbe.apply(system);
final Fixture.RS rs = new Fixture.RS() {
@Override
public Publisher<Tweet> tweets() {

View file

@ -7,6 +7,7 @@ import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -18,25 +19,26 @@ import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
public class StreamBuffersRateDocTest {
public class StreamBuffersRateDocTest extends AbstractJavaTest {
static class Job {}
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("StreamBufferRateDocTest");
system = ActorSystem.create("StreamBuffersDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
final SilenceSystemOut.System System = SilenceSystemOut.get();
@Test

View file

@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit;
import akka.Done;
import akka.NotUsed;
import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -24,24 +25,24 @@ import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
public class StreamPartialFlowGraphDocTest {
public class StreamPartialFlowGraphDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("StreamPartialFlowGraphDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void demonstrateBuildWithOpenPorts() throws Exception {
//#simple-partial-flow-graph

View file

@ -12,6 +12,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import docs.AbstractJavaTest;
import org.junit.*;
import static org.junit.Assert.assertEquals;
@ -29,23 +30,24 @@ import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
public class StreamTestKitDocTest {
public class StreamTestKitDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("StreamTestKitDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void strictCollection() throws Exception {
//#strict-collection

View file

@ -11,6 +11,7 @@ import akka.japi.JavaPartialFunction;
import akka.testkit.JavaTestKit;
import akka.stream.*;
import akka.stream.javadsl.*;
import docs.AbstractJavaTest;
import docs.stream.TwitterStreamQuickstartDocTest.Model.Author;
import docs.stream.TwitterStreamQuickstartDocTest.Model.Hashtag;
import docs.stream.TwitterStreamQuickstartDocTest.Model.Tweet;
@ -36,20 +37,23 @@ import static docs.stream.TwitterStreamQuickstartDocTest.Model.AKKA;
import static docs.stream.TwitterStreamQuickstartDocTest.Model.tweets;
@SuppressWarnings("unused")
public class TwitterStreamQuickstartDocTest {
public class TwitterStreamQuickstartDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("SampleActorTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
static abstract class Model {
@ -213,9 +217,6 @@ public class TwitterStreamQuickstartDocTest {
}
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void demonstrateFilterAndMap() {
final SilenceSystemOut.System System = SilenceSystemOut.get();

View file

@ -13,6 +13,7 @@ import akka.stream.ActorAttributes;
import akka.stream.io.IOResult;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.FileIO;
import docs.AbstractJavaTest;
import docs.stream.SilenceSystemOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -23,22 +24,25 @@ import akka.stream.*;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
public class StreamFileDocTest {
public class StreamFileDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("StreamFileDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
final SilenceSystemOut.System System = SilenceSystemOut.get();

View file

@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import akka.NotUsed;
import akka.stream.io.Framing;
import docs.AbstractJavaTest;
import docs.stream.SilenceSystemOut;
import java.net.InetSocketAddress;
@ -26,22 +27,24 @@ import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import akka.util.ByteString;
public class StreamTcpDocTest {
public class StreamTcpDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("StreamTcpDocTest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
final SilenceSystemOut.System System = SilenceSystemOut.get();
@ -77,9 +80,9 @@ public class StreamTcpDocTest {
final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class)
.via(Framing.delimiter(ByteString.fromString("\n"), 256, false))
.map(bytes -> bytes.utf8String())
.map(ByteString::utf8String)
.map(s -> s + "!!!\n")
.map(s -> ByteString.fromString(s));
.map(ByteString::fromString);
connection.handleWith(echo, mat);
}, mat);
@ -95,50 +98,33 @@ public class StreamTcpDocTest {
final TestProbe serverProbe = new TestProbe(system);
final Source<IncomingConnection,CompletionStage<ServerBinding>> connections =
Tcp.get(system).bind(localhost.getHostName(), localhost.getPort()); // TODO getHostString in Java7
Tcp.get(system).bind(localhost.getHostString(), localhost.getPort());
//#welcome-banner-chat-server
connections.runForeach(connection -> {
// server logic, parses incoming commands
final PushStage<String, String> commandParser = new PushStage<String, String>() {
@Override public SyncDirective onPush(String elem, Context<String> ctx) {
if (elem.equals("BYE"))
return ctx.finish();
else
return ctx.push(elem + "!");
}
};
final Flow<String, String, NotUsed> commandParser =
Flow.<String>create()
.takeWhile(elem -> !elem.equals("BYE"))
.map(elem -> elem + "!");
final String welcomeMsg = "Welcome to: " + connection.localAddress() +
" you are: " + connection.remoteAddress() + "!\n";
" you are: " + connection.remoteAddress() + "!";
final Source<ByteString, NotUsed> welcome =
Source.single(ByteString.fromString(welcomeMsg));
final Flow<ByteString, ByteString, NotUsed> echoFlow =
final Source<String, NotUsed> welcome = Source.single(welcomeMsg);
final Flow<ByteString, ByteString, NotUsed> serverLogic =
Flow.of(ByteString.class)
.via(Framing.delimiter(ByteString.fromString("\n"), 256, false))
.map(bytes -> bytes.utf8String())
.map(ByteString::utf8String)
//#welcome-banner-chat-server
.map(command -> {
serverProbe.ref().tell(command, null);
return command;
})
//#welcome-banner-chat-server
.transform(() -> commandParser)
.via(commandParser)
.merge(welcome)
.map(s -> s + "\n")
.map(s -> ByteString.fromString(s));
final Flow<ByteString, ByteString, NotUsed> serverLogic =
Flow.fromGraph(GraphDSL.create(builder -> {
final UniformFanInShape<ByteString, ByteString> concat =
builder.add(Concat.create());
final FlowShape<ByteString, ByteString> echo = builder.add(echoFlow);
builder
.from(builder.add(welcome)).toFanIn(concat)
.from(echo).toFanIn(concat);
return FlowShape.of(echo.in(), concat.out());
}));
.map(ByteString::fromString);
connection.handleWith(serverLogic, mat);
}, mat);
@ -156,27 +142,25 @@ public class StreamTcpDocTest {
final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection =
Tcp.get(system).outgoingConnection(localhost.getHostString(), localhost.getPort());
//#repl-client
final PushStage<String, ByteString> replParser = new PushStage<String, ByteString>() {
@Override public SyncDirective onPush(String elem, Context<ByteString> ctx) {
if (elem.equals("q"))
return ctx.pushAndFinish(ByteString.fromString("BYE\n"));
else
return ctx.push(ByteString.fromString(elem + "\n"));
}
};
final Flow<String, ByteString, NotUsed> replParser =
Flow.<String>create()
.takeWhile(elem -> !elem.equals("q"))
.concat(Source.single("BYE")) // will run after the original flow completes
.map(elem -> ByteString.fromString(elem + "\n"));
final Flow<ByteString, ByteString, NotUsed> repl = Flow.of(ByteString.class)
.via(Framing.delimiter(ByteString.fromString("\n"), 256, false))
.map(bytes -> bytes.utf8String())
.map(ByteString::utf8String)
.map(text -> {System.out.println("Server: " + text); return "next";})
.map(elem -> readLine("> "))
.transform(() -> replParser);
.via(replParser);
connection.join(repl).run(mat);
//#repl-client
//#repl-client
}
serverProbe.expectMsg("Hello world");
serverProbe.expectMsg("What a lovely day");
serverProbe.expectMsg("BYE");

View file

@ -5,15 +5,11 @@ package docs.stream.javadsl.cookbook;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.*;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.stage.Context;
import akka.stream.stage.PushPullStage;
import akka.stream.stage.PushStage;
import akka.stream.stage.SyncDirective;
import akka.stream.stage.*;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import org.junit.AfterClass;
@ -24,6 +20,7 @@ import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -31,19 +28,21 @@ import static org.junit.Assert.assertTrue;
public class RecipeByteStrings extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeByteStrings");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
final Source<ByteString, NotUsed> rawBytes = Source.from(Arrays.asList(
ByteString.fromArray(new byte[] { 1, 2 }),
@ -57,42 +56,77 @@ public class RecipeByteStrings extends RecipeTest {
final int CHUNK_LIMIT = 2;
//#bytestring-chunker
class Chunker extends PushPullStage<ByteString, ByteString> {
class Chunker extends GraphStage<FlowShape<ByteString, ByteString>> {
private final int chunkSize;
private ByteString buffer = ByteString.empty();
public Inlet<ByteString> in = Inlet.<ByteString>create("Chunker.in");
public Outlet<ByteString> out = Outlet.<ByteString>create("Chunker.out");
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
public Chunker(int chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public SyncDirective onPush(ByteString elem, Context<ByteString> ctx) {
buffer = buffer.concat(elem);
return emitChunkOrPull(ctx);
public FlowShape<ByteString, ByteString> shape() {
return shape;
}
@Override
public SyncDirective onPull(Context<ByteString> ctx) {
return emitChunkOrPull(ctx);
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
private ByteString buffer = ByteString.empty();
{
setHandler(out, new AbstractOutHandler(){
@Override
public void onPull() throws Exception {
if (isClosed(in)) emitChunk();
else pull(in);
}
});
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
ByteString elem = grab(in);
buffer = buffer.concat(elem);
emitChunk();
}
@Override
public void onUpstreamFinish() throws Exception {
if (buffer.isEmpty()) completeStage();
// elements left in buffer, keep accepting downstream pulls
// and push from buffer until buffer is emitted
}
});
}
private void emitChunk() {
if (buffer.isEmpty()) {
if (isClosed(in)) completeStage();
else pull(in);
} else {
Tuple2<ByteString, ByteString> split = buffer.splitAt(chunkSize);
ByteString chunk = split._1();
buffer = split._2();
push(out, chunk);
}
}
};
}
public SyncDirective emitChunkOrPull(Context<ByteString> ctx) {
if (buffer.isEmpty()) {
return ctx.pull();
} else {
Tuple2<ByteString, ByteString> split = buffer.splitAt(chunkSize);
ByteString emit = split._1();
buffer = split._2();
return ctx.push(emit);
}
}
}
//#bytestring-chunker
{
//#bytestring-chunker2
Source<ByteString, NotUsed> chunksStream =
rawBytes.transform(() -> new Chunker(CHUNK_LIMIT));
rawBytes.via(new Chunker(CHUNK_LIMIT));
//#bytestring-chunker2
CompletionStage<List<ByteString>> chunksFuture = chunksStream.limit(10).runWith(Sink.seq(), mat);
@ -119,22 +153,49 @@ public class RecipeByteStrings extends RecipeTest {
final int SIZE_LIMIT = 9;
//#bytes-limiter
class ByteLimiter extends PushStage<ByteString, ByteString> {
class ByteLimiter extends GraphStage<FlowShape<ByteString, ByteString>> {
final long maximumBytes;
private int count = 0;
public Inlet<ByteString> in = Inlet.<ByteString>create("ByteLimiter.in");
public Outlet<ByteString> out = Outlet.<ByteString>create("ByteLimiter.out");
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
public ByteLimiter(long maximumBytes) {
this.maximumBytes = maximumBytes;
}
@Override
public SyncDirective onPush(ByteString chunk, Context<ByteString> ctx) {
count += chunk.size();
if (count > maximumBytes) {
return ctx.fail(new IllegalStateException("Too much bytes"));
} else {
return ctx.push(chunk);
}
public FlowShape<ByteString, ByteString> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
private int count = 0;
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
pull(in);
}
});
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
ByteString chunk = grab(in);
count += chunk.size();
if (count > maximumBytes) {
failStage(new IllegalStateException("Too much bytes"));
} else {
push(out, chunk);
}
}
});
}
};
}
}
//#bytes-limiter
@ -142,7 +203,7 @@ public class RecipeByteStrings extends RecipeTest {
{
//#bytes-limiter2
Flow<ByteString, ByteString, NotUsed> limiter =
Flow.of(ByteString.class).transform(() -> new ByteLimiter(SIZE_LIMIT));
Flow.of(ByteString.class).via(new ByteLimiter(SIZE_LIMIT));
//#bytes-limiter2
final Source<ByteString, NotUsed> bytes1 = Source.from(Arrays.asList(
@ -167,11 +228,12 @@ public class RecipeByteStrings extends RecipeTest {
boolean thrown = false;
try {
bytes2.via(limiter).limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
} catch (IllegalStateException ex) {
} catch (ExecutionException ex) {
assertEquals(ex.getCause().getClass(), IllegalStateException.class);
thrown = true;
}
assertTrue("Expected IllegalStateException to be thrown", thrown);
}
};
}
@ -187,7 +249,7 @@ public class RecipeByteStrings extends RecipeTest {
ByteString.fromArray(new byte[] { 7, 8, 9 })));
//#compacting-bytestrings
Source<ByteString, NotUsed> compacted = rawBytes.map(bs -> bs.compact());
Source<ByteString, NotUsed> compacted = rawBytes.map(ByteString::compact);
//#compacting-bytestrings
List<ByteString> got = compacted.limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);

View file

@ -5,14 +5,10 @@ package docs.stream.javadsl.cookbook;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.*;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.stage.Context;
import akka.stream.stage.PushPullStage;
import akka.stream.stage.SyncDirective;
import akka.stream.stage.TerminationDirective;
import akka.stream.stage.*;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import org.junit.AfterClass;
@ -21,6 +17,7 @@ import org.junit.Test;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@ -28,54 +25,84 @@ import static org.junit.Assert.assertEquals;
public class RecipeDigest extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeDigest");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
//#calculating-digest
class DigestCalculator extends GraphStage<FlowShape<ByteString, ByteString>> {
private final String algorithm;
public Inlet<ByteString> in = Inlet.<ByteString>create("DigestCalculator.in");
public Outlet<ByteString> out = Outlet.<ByteString>create("DigestCalculator.out");
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
public DigestCalculator(String algorithm) {
this.algorithm = algorithm;
}
@Override
public FlowShape<ByteString, ByteString> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
final MessageDigest digest;
{
try {
digest = MessageDigest.getInstance(algorithm);
} catch(NoSuchAlgorithmException ex) {
throw new RuntimeException(ex);
}
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
pull(in);
}
});
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
ByteString chunk = grab(in);
digest.update(chunk.toArray());
pull(in);
}
@Override
public void onUpstreamFinish() throws Exception {
// If the stream is finished, we need to emit the digest
// before completing
emit(out, ByteString.fromArray(digest.digest()));
completeStage();
}
});
}
};
}
}
//#calculating-digest
@Test
public void work() throws Exception {
new JavaTestKit(system) {
//#calculating-digest
public PushPullStage<ByteString, ByteString> digestCalculator(String algorithm)
throws NoSuchAlgorithmException {
return new PushPullStage<ByteString, ByteString>() {
final MessageDigest digest = MessageDigest.getInstance(algorithm);
@Override
public SyncDirective onPush(ByteString chunk, Context<ByteString> ctx) {
digest.update(chunk.toArray());
return ctx.pull();
}
@Override
public SyncDirective onPull(Context<ByteString> ctx) {
if (ctx.isFinishing()) {
return ctx.pushAndFinish(ByteString.fromArray(digest.digest()));
} else {
return ctx.pull();
}
}
@Override
public TerminationDirective onUpstreamFinish(Context<ByteString> ctx) {
// If the stream is finished, we need to emit the last element in the onPull block.
// It is not allowed to directly emit elements from a termination block
// (onUpstreamFinish or onUpstreamFailure)
return ctx.absorbTermination();
}
};
}
//#calculating-digest
{
Source<ByteString, NotUsed> data = Source.from(Arrays.asList(
@ -84,7 +111,7 @@ public class RecipeDigest extends RecipeTest {
//#calculating-digest2
final Source<ByteString, NotUsed> digest = data
.transform(() -> digestCalculator("SHA-256"));
.via(new DigestCalculator("SHA-256"));
//#calculating-digest2
ByteString got = digest.runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);

View file

@ -20,20 +20,21 @@ import java.util.concurrent.CompletionStage;
public class RecipeDroppyBroadcast extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements");
system = ActorSystem.create("RecipeDroppyBroadcast");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void work() throws Exception {
new JavaTestKit(system) {

View file

@ -22,20 +22,21 @@ import static org.junit.Assert.assertEquals;
public class RecipeFlattenList extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeFlattenList");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void workWithMapConcat() throws Exception {
new JavaTestKit(system) {

View file

@ -32,20 +32,21 @@ import static junit.framework.TestCase.assertTrue;
public class RecipeGlobalRateLimit extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeGlobalRateLimit");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
static
//#global-limiter-actor
public class Limiter extends AbstractActor {

View file

@ -5,20 +5,17 @@ package docs.stream.javadsl.cookbook;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.*;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.stage.DetachedContext;
import akka.stream.stage.DetachedStage;
import akka.stream.stage.DownstreamDirective;
import akka.stream.stage.UpstreamDirective;
import akka.stream.stage.*;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.stream.testkit.javadsl.TestSource;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -28,64 +25,112 @@ import java.util.concurrent.TimeUnit;
public class RecipeHold extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeMultiGroupBy");
system = ActorSystem.create("RecipeHold");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
//#hold-version-1
class HoldWithInitial<T> extends DetachedStage<T, T> {
private T currentValue;
class HoldWithInitial<T> extends GraphStage<FlowShape<T, T>> {
public Inlet<T> in = Inlet.<T>create("HoldWithInitial.in");
public Outlet<T> out = Outlet.<T>create("HoldWithInitial.out");
private FlowShape<T, T> shape = FlowShape.of(in, out);
private final T initial;
public HoldWithInitial(T initial) {
currentValue = initial;
this.initial = initial;
}
@Override
public UpstreamDirective onPush(T elem, DetachedContext<T> ctx) {
currentValue = elem;
return ctx.pull();
public FlowShape<T, T> shape() {
return shape;
}
@Override
public DownstreamDirective onPull(DetachedContext<T> ctx) {
return ctx.push(currentValue);
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
private T currentValue = initial;
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
currentValue = grab(in);
pull(in);
}
});
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
push(out, currentValue);
}
});
}
@Override
public void preStart() {
pull(in);
}
};
}
}
//#hold-version-1
//#hold-version-2
class HoldWithWait<T> extends DetachedStage<T, T> {
private T currentValue = null;
private boolean waitingFirstValue = true;
class HoldWithWait<T> extends GraphStage<FlowShape<T, T>> {
public Inlet<T> in = Inlet.<T>create("HoldWithInitial.in");
public Outlet<T> out = Outlet.<T>create("HoldWithInitial.out");
private FlowShape<T, T> shape = FlowShape.of(in, out);
@Override
public UpstreamDirective onPush(T elem, DetachedContext<T> ctx) {
currentValue = elem;
waitingFirstValue = false;
if (ctx.isHoldingDownstream()) {
return ctx.pushAndPull(currentValue);
} else {
return ctx.pull();
}
public FlowShape<T, T> shape() {
return shape;
}
@Override
public DownstreamDirective onPull(DetachedContext<T> ctx) {
if (waitingFirstValue) {
return ctx.holdDownstream();
} else {
return ctx.push(currentValue);
}
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
private T currentValue = null;
private boolean waitingFirstValue = true;
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
currentValue = grab(in);
if (waitingFirstValue) {
waitingFirstValue = false;
if (isAvailable(out)) push(out, currentValue);
}
pull(in);
}
});
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
if (!waitingFirstValue) push(out, currentValue);
}
});
}
@Override
public void preStart() {
pull(in);
}
};
}
}
//#hold-version-2
@ -98,7 +143,7 @@ public class RecipeHold extends RecipeTest {
final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.probe(system);
Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubSub =
source.transform(() -> new HoldWithInitial<>(0)).toMat(sink, Keep.both()).run(mat);
source.via(new HoldWithInitial<>(0)).toMat(sink, Keep.both()).run(mat);
TestPublisher.Probe<Integer> pub = pubSub.first();
TestSubscriber.Probe<Integer> sub = pubSub.second();
@ -126,7 +171,7 @@ public class RecipeHold extends RecipeTest {
final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.probe(system);
Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubSub =
source.transform(() -> new HoldWithWait<>()).toMat(sink, Keep.both()).run(mat);
source.via(new HoldWithWait<>()).toMat(sink, Keep.both()).run(mat);
TestPublisher.Probe<Integer> pub = pubSub.first();
TestSubscriber.Probe<Integer> sub = pubSub.second();

View file

@ -18,20 +18,21 @@ import java.util.concurrent.TimeUnit;
public class RecipeKeepAlive extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeKeepAlive");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
class Tick {}
public final Tick TICK = new Tick();

View file

@ -25,20 +25,21 @@ import java.util.Arrays;
public class RecipeLoggingElements extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements", ConfigFactory.parseString("akka.loglevel=DEBUG\nakka.loggers = [akka.testkit.TestEventListener]"));
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void workWithPrintln() throws Exception {
new JavaTestKit(system) {

View file

@ -22,20 +22,21 @@ import java.util.concurrent.TimeUnit;
public class RecipeManualTrigger extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeKeepAlive");
system = ActorSystem.create("RecipeManualTrigger");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
class Trigger {
}

View file

@ -29,20 +29,21 @@ import java.util.concurrent.TimeUnit;
public class RecipeMissedTicks extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeMultiGroupBy");
system = ActorSystem.create("RecipeMissedTicks");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void work() throws Exception {
new JavaTestKit(system) {

View file

@ -31,20 +31,21 @@ import static junit.framework.TestCase.assertTrue;
public class RecipeMultiGroupByTest extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeMultiGroupBy");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
static class Topic {
private final String name;

View file

@ -22,20 +22,21 @@ import java.util.concurrent.TimeUnit;
public class RecipeParseLines extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements");
system = ActorSystem.create("RecipeParseLines");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void parseLines() throws Exception {
final Source<ByteString, NotUsed> rawData = Source.from(Arrays.asList(

View file

@ -31,20 +31,21 @@ import java.util.stream.Collectors;
public class RecipeReduceByKeyTest extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements");
system = ActorSystem.create("RecipeReduceByKey");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void work() throws Exception {
new JavaTestKit(system) {

View file

@ -24,20 +24,20 @@ import java.util.concurrent.TimeUnit;
public class RecipeSeq extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeLoggingElements");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void drainSourceToList() throws Exception {
new JavaTestKit(system) {

View file

@ -25,20 +25,21 @@ import java.util.concurrent.TimeUnit;
public class RecipeSimpleDrop extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeSimpleDrop");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
@Test
public void work() throws Exception {
new JavaTestKit(system) {

View file

@ -1,6 +1,8 @@
package docs.stream.javadsl.cookbook;
public class RecipeTest {
import docs.AbstractJavaTest;
public abstract class RecipeTest extends AbstractJavaTest {
final class Message {
public final String msg;

View file

@ -24,20 +24,21 @@ import static org.junit.Assert.assertTrue;
public class RecipeWorkerPool extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeWorkerPool");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
final Materializer mat = ActorMaterializer.create(system);
//#worker-pool
public static <In, Out> Flow<In, Out, NotUsed> balancer(
Flow<In, Out, NotUsed> worker, int workerCount) {

View file

@ -210,7 +210,7 @@ through the stream starting from the stage which failed, all the way downstream
Connections Source failures
^^^^^^^^^^^^^^^^^^^^^^^^^^^
In the example below we add a custom ``PushStage`` (see :ref:`stream-customize-java`) in order to react to the
In the example below we add a custom ``GraphStage`` (see :ref:`stream-customize-java`) in order to react to the
stream's failure. We signal a ``failureMonitor`` actor with the cause why the stream is going down, and let the Actor
handle the rest maybe it'll decide to restart the server or shutdown the ActorSystem, that however is not our concern anymore.

View file

@ -75,17 +75,15 @@ Calculating the digest of a ByteString stream
**Situation:** A stream of bytes is given as a stream of ``ByteStrings`` and we want to calculate the cryptographic digest
of the stream.
This recipe uses a :class:`PushPullStage` to host a mutable :class:`MessageDigest` class (part of the Java Cryptography
This recipe uses a :class:`GraphStage` to host a mutable :class:`MessageDigest` class (part of the Java Cryptography
API) and update it with the bytes arriving from the stream. When the stream starts, the ``onPull`` handler of the
stage is called, which just bubbles up the ``pull`` event to its upstream. As a response to this pull, a ByteString
chunk will arrive (``onPush``) which we use to update the digest, then it will pull for the next chunk.
Eventually the stream of ``ByteStrings`` depletes and we get a notification about this event via ``onUpstreamFinish``.
At this point we want to emit the digest value, but we cannot do it in this handler directly. Instead we call
``ctx.absorbTermination()`` signalling to our context that we do not yet want to finish. When the environment decides that
we can emit further elements ``onPull`` is called again, and we see ``ctx.isFinishing()`` returning ``true`` (since the upstream
source has been depleted already). Since we only want to emit a final element it is enough to call ``ctx.pushAndFinish``
passing the digest ByteString to be emitted.
At this point we want to emit the digest value, but we cannot do it with ``push`` in this handler directly since there may
be no downstream demand. Instead we call ``emit`` which will temporarily replace the handlers, emit the provided value when
demand comes in and then reset the stage state. It will then complete the stage.
.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeDigest.java#calculating-digest
@ -278,14 +276,11 @@ Create a stream processor that repeats the last element seen
of them is slowing down the other by dropping earlier unconsumed elements from the upstream if necessary, and repeating
the last value for the downstream if necessary.
We have two options to implement this feature. In both cases we will use :class:`DetachedStage` to build our custom
element (:class:`DetachedStage` is specifically designed for rate translating elements just like ``conflate``,
``expand`` or ``buffer``). In the first version we will use a provided initial value ``initial`` that will be used
We have two options to implement this feature. In both cases we will use :class:`GraphStage` to build our custom
element. In the first version we will use a provided initial value ``initial`` that will be used
to feed the downstream if no upstream element is ready yet. In the ``onPush()`` handler we just overwrite the
``currentValue`` variable and immediately relieve the upstream by calling ``pull()`` (remember, implementations of
:class:`DetachedStage` are not allowed to call ``push()`` as a response to ``onPush()`` or call ``pull()`` as a response
of ``onPull()``). The downstream ``onPull`` handler is very similar, we immediately relieve the downstream by
emitting ``currentValue``.
``currentValue`` variable and immediately relieve the upstream by calling ``pull()``. The downstream ``onPull`` handler
is very similar, we immediately relieve the downstream by emitting ``currentValue``.
.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeHold.java#hold-version-1
@ -296,9 +291,9 @@ case: if the very first element is not yet available.
We introduce a boolean variable ``waitingFirstValue`` to denote whether the first element has been provided or not
(alternatively an :class:`Optional` can be used for ``currentValue`` or if the element type is a subclass of Object
a null can be used with the same purpose). In the downstream ``onPull()`` handler the difference from the previous
version is that we call ``holdDownstream()`` if the first element is not yet available and thus blocking our downstream. The
upstream ``onPush()`` handler sets ``waitingFirstValue`` to false, and after checking if ``holdDownstream()`` has been called it
either relieves the upstream producer, or both the upstream producer and downstream consumer by calling ``pushAndPull()``
version is that we check if we have received the the first value and only emit if we have. This leads to that when the
first element comes in we must check if there possibly already was demand from downstream so that we in that case can
push the element directly.
.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeHold.java#hold-version-2
@ -343,14 +338,14 @@ Chunking up a stream of ByteStrings into limited size ByteStrings
the same sequence, but capping the size of ByteStrings. In other words we want to slice up ByteStrings into smaller
chunks if they exceed a size threshold.
This can be achieved with a single :class:`PushPullStage`. The main logic of our stage is in ``emitChunkOrPull()``
This can be achieved with a single :class:`GraphStage`. The main logic of our stage is in ``emitChunk()``
which implements the following logic:
* if the buffer is empty, we pull for more bytes
* if the buffer is empty, and upstream is not closed we pull for more bytes, if it is closed we complete
* if the buffer is nonEmpty, we split it according to the ``chunkSize``. This will give a next chunk that we will emit,
and an empty or nonempty remaining buffer.
Both ``onPush()`` and ``onPull()`` calls ``emitChunkOrPull()`` the only difference is that the push handler also stores
Both ``onPush()`` and ``onPull()`` calls ``emitChunk()`` the only difference is that the push handler also stores
the incoming chunk by appending to the end of the buffer.
.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeByteStrings.java#bytestring-chunker
@ -363,7 +358,7 @@ Limit the number of bytes passing through a stream of ByteStrings
**Situation:** Given a stream of ByteStrings we want to fail the stream if more than a given maximum of bytes has been
consumed.
This recipe uses a :class:`PushStage` to implement the desired feature. In the only handler we override,
This recipe uses a :class:`GraphStage` to implement the desired feature. In the only handler we override,
``onPush()`` we just update a counter and see if it gets larger than ``maximumBytes``. If a violation happens
we signal failure, otherwise we forward the chunk we have received.

View file

@ -36,9 +36,8 @@ Graph
is running.
Processing Stage
The common name for all building blocks that build up a Graph.
Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like
:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``.
For the full list of built-in processing stages see :ref:`stages-overview_java`
Examples of a processing stage would be operations like ``map()``, ``filter()``, custom ``GraphStage`` s and graph
junctions like ``Merge`` or ``Broadcast``. For the full list of built-in processing stages see :ref:`stages-overview_java`
When we talk about *asynchronous, non-blocking backpressure* we mean that the processing stages available in Akka
Streams will not use blocking calls but asynchronous message passing to exchange messages between each other, and they

View file

@ -86,16 +86,11 @@ it makes sense to make the Server initiate the conversation by emitting a "hello
.. includecode:: ../code/docs/stream/io/StreamTcpDocTest.java#welcome-banner-chat-server
The way we constructed a :class:`Flow` using the :class:`GraphDSL` is explained in detail in
:ref:`constructing-sources-sinks-flows-from-partial-graphs-java`, however the basic concepts is rather simple
we can encapsulate arbitrarily complex logic within a :class:`Flow` as long as it exposes the same interface, which means
exposing exactly one :class:`Outlet` and exactly one :class:`Inlet` which will be connected to the TCP
pipeline. In this example we use a :class:`Concat` graph processing stage to inject the initial message, and then
continue with handling all incoming data using the echo handler. You should use this pattern of encapsulating complex
logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers.
To emit the initial message we merge a ``Source`` with a single element, after the command processing but before the
framing and transformation to ``ByteStrings`` this way we do not have to repeat such logic.
In this example both client and server may need to close the stream based on a parsed command - ``BYE`` in the case
of the server, and ``q`` in the case of the client. This is implemented by using a custom :class:`PushStage`
of the server, and ``q`` in the case of the client. This is implemented by using a custom :class:`GraphStage`
which completes the stream once it encounters such command.
Streaming File IO

View file

@ -11,7 +11,6 @@ import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import akka.stream.stage.{ Context, PushStage }
import akka.testkit.TestActors
import org.scalatest.{ Matchers, WordSpec }
import scala.language.postfixOps
@ -107,17 +106,9 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props)
val reactToTopLevelFailures = Flow[IncomingConnection]
.transform { () =>
new PushStage[IncomingConnection, IncomingConnection] {
override def onPush(elem: IncomingConnection, ctx: Context[IncomingConnection]) =
ctx.push(elem)
override def onUpstreamFailure(cause: Throwable, ctx: Context[IncomingConnection]) = {
failureMonitor ! cause
super.onUpstreamFailure(cause, ctx)
}
}
}
.watchTermination()((_, termination) => termination.onFailure {
case cause => failureMonitor ! cause
})
serverSource
.via(reactToTopLevelFailures)
@ -134,16 +125,10 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
val serverSource = Http().bind(host, port)
val reactToConnectionFailure = Flow[HttpRequest]
.transform { () =>
new PushStage[HttpRequest, HttpRequest] {
override def onPush(elem: HttpRequest, ctx: Context[HttpRequest]) =
ctx.push(elem)
override def onUpstreamFailure(cause: Throwable, ctx: Context[HttpRequest]) = {
// handle the failure somehow
super.onUpstreamFailure(cause, ctx)
}
}
.recover[HttpRequest] {
case ex =>
// handle the failure somehow
throw ex
}
val httpEcho = Flow[HttpRequest]

View file

@ -66,52 +66,68 @@ object BidiFlowDocSpec {
ByteString.newBuilder.putInt(len).append(bytes).result()
}
class FrameParser extends PushPullStage[ByteString, ByteString] {
// this holds the received but not yet parsed bytes
var stash = ByteString.empty
// this holds the current message length or -1 if at a boundary
var needed = -1
class FrameParser extends GraphStage[FlowShape[ByteString, ByteString]] {
override def onPush(bytes: ByteString, ctx: Context[ByteString]) = {
stash ++= bytes
run(ctx)
}
override def onPull(ctx: Context[ByteString]) = run(ctx)
override def onUpstreamFinish(ctx: Context[ByteString]) =
if (stash.isEmpty) ctx.finish()
else ctx.absorbTermination() // we still have bytes to emit
val in = Inlet[ByteString]("FrameParser.in")
val out = Outlet[ByteString]("FrameParser.out")
override val shape = FlowShape.of(in, out)
private def run(ctx: Context[ByteString]): SyncDirective =
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) pullOrFinish(ctx)
else {
needed = stash.iterator.getInt
stash = stash.drop(4)
run(ctx) // cycle back to possibly already emit the next chunk
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
// this holds the received but not yet parsed bytes
var stash = ByteString.empty
// this holds the current message length or -1 if at a boundary
var needed = -1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) run()
else pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val bytes = grab(in)
stash = stash ++ bytes
run()
}
} else if (stash.length < needed) {
// we are in the middle of a message, need more bytes
pullOrFinish(ctx)
} else {
// we have enough to emit at least one message, so do it
val emit = stash.take(needed)
stash = stash.drop(needed)
needed = -1
ctx.push(emit)
}
/*
* After having called absorbTermination() we cannot pull any more, so if we need
* more data we will just have to give up.
*/
private def pullOrFinish(ctx: Context[ByteString]) =
if (ctx.isFinishing) ctx.finish()
else ctx.pull()
override def onUpstreamFinish(): Unit = {
if (stash.isEmpty) completeStage()
// wait with completion and let run() complete when the
// rest of the stash has been sent downstream
}
})
private def run(): Unit = {
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) {
if (isClosed(in)) completeStage()
else pull(in)
} else {
needed = stash.iterator.getInt
stash = stash.drop(4)
run() // cycle back to possibly already emit the next chunk
}
} else if (stash.length < needed) {
// we are in the middle of a message, need more bytes,
// or have to stop if input closed
if (isClosed(in)) completeStage()
else pull(in)
} else {
// we have enough to emit at least one message, so do it
val emit = stash.take(needed)
stash = stash.drop(needed)
needed = -1
push(out, emit)
}
}
}
}
val outbound = b.add(Flow[ByteString].map(addLengthHeader))
val inbound = b.add(Flow[ByteString].transform(() => new FrameParser))
val inbound = b.add(Flow[ByteString].via(new FrameParser))
BidiShape.fromFlows(outbound, inbound)
})
//#framing

View file

@ -1,192 +0,0 @@
package docs.stream
import akka.stream._
import akka.stream.scaladsl.{ Sink, Source, Flow, Keep }
import akka.stream.testkit.AkkaSpec
import org.scalatest.concurrent.ScalaFutures
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#import-stage
import akka.stream.stage._
//#import-stage
implicit val materializer = ActorMaterializer()
"stages demo" must {
"demonstrate various PushPullStages" in {
//#one-to-one
class Map[A, B](f: A => B) extends PushPullStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
override def onPull(ctx: Context[B]): SyncDirective =
ctx.pull()
}
//#one-to-one
//#many-to-one
class Filter[A](p: A => Boolean) extends PushPullStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def onPull(ctx: Context[A]): SyncDirective =
ctx.pull()
}
//#many-to-one
//#one-to-many
class Duplicator[A]() extends PushPullStage[A, A] {
private var lastElem: A = _
private var oneLeft = false
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
lastElem = elem
oneLeft = true
ctx.push(elem)
}
override def onPull(ctx: Context[A]): SyncDirective =
if (!ctx.isFinishing) {
// the main pulling logic is below as it is demonstrated on the illustration
if (oneLeft) {
oneLeft = false
ctx.push(lastElem)
} else
ctx.pull()
} else {
// If we need to emit a final element after the upstream
// finished
if (oneLeft) ctx.pushAndFinish(lastElem)
else ctx.finish()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
//#one-to-many
val keyedSink = Sink.head[immutable.Seq[Int]]
val sink = Flow[Int].grouped(10).toMat(keyedSink)(Keep.right)
//#stage-chain
val resultFuture = Source(1 to 10)
.transform(() => new Filter(_ % 2 == 0))
.transform(() => new Duplicator())
.transform(() => new Map(_ / 2))
.runWith(sink)
//#stage-chain
Await.result(resultFuture, 3.seconds) should be(Seq(1, 1, 2, 2, 3, 3, 4, 4, 5, 5))
}
"demonstrate various PushStages" in {
import akka.stream.stage._
//#pushstage
class Map[A, B](f: A => B) extends PushStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
}
class Filter[A](p: A => Boolean) extends PushStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
}
//#pushstage
}
"demonstrate GraphStage" in {
//#doubler-stateful
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Duplicator.in")
val out = Outlet[A]("Duplicator.out")
val shape: FlowShape[A, A] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
emitMultiple(out, List(elem, elem))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
//#doubler-stateful
val duplicator = Flow.fromGraph(new Duplicator[Int])
val fold = Source(1 to 2).via(duplicator).runFold("")(_ + _)
whenReady(fold) { s
s should be("1122")
}
}
"demonstrate DetachedStage" in {
//#detached
class Buffer2[T]() extends DetachedStage[T, T] {
private var buf = Vector.empty[T]
private var capacity = 2
private def isFull = capacity == 0
private def isEmpty = capacity == 2
private def dequeue(): T = {
capacity += 1
val next = buf.head
buf = buf.tail
next
}
private def enqueue(elem: T) = {
capacity -= 1
buf = buf :+ elem
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (isEmpty) {
if (ctx.isFinishing) ctx.finish() // No more elements will arrive
else ctx.holdDownstream() // waiting until new elements
} else {
val next = dequeue()
if (ctx.isHoldingUpstream) ctx.pushAndPull(next) // release upstream
else ctx.push(next)
}
}
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
enqueue(elem)
if (isFull) ctx.holdUpstream() // Queue is now full, wait until new empty slot
else {
if (ctx.isHoldingDownstream) ctx.pushAndPull(dequeue()) // Release downstream
else ctx.pull()
}
}
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = {
if (!isEmpty) ctx.absorbTermination() // still need to flush from buffer
else ctx.finish() // already empty, finishing
}
}
//#detached
}
}
}

View file

@ -9,6 +9,7 @@ import akka.stream.stage._
import akka.stream._
import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec }
import akka.testkit.TestLatch
import scala.collection.mutable
import scala.concurrent.{ Promise, Await, Future }
@ -271,6 +272,7 @@ class GraphStageDocSpec extends AkkaSpec {
"Demonstrate an asynchronous side channel" in {
import system.dispatcher
//#async-side-channel
// will close upstream when the future completes
class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] {
@ -301,20 +303,31 @@ class GraphStageDocSpec extends AkkaSpec {
//#async-side-channel
// tests:
val switch = Promise[Unit]()
val duplicator = Flow.fromGraph(new KillSwitch[Int](switch.future))
// TODO this is probably racey, is there a way to make sure it happens after?
val valueAfterKill = switch.future.flatMap(_ => Future(4))
val in = TestPublisher.probe[Int]()
val out = TestSubscriber.probe[Int]()
val result =
Source(Vector(1, 2, 3)).concat(Source.fromFuture(valueAfterKill))
.via(duplicator)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Source.fromPublisher(in)
.via(duplicator)
.to(Sink.fromSubscriber(out))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val sub = in.expectSubscription()
out.request(1)
sub.expectRequest()
sub.sendNext(1)
out.expectNext(1)
switch.success(Unit)
Await.result(result, 3.seconds) should ===(Seq(1, 2, 3))
out.expectComplete()
}
"Demonstrate a graph stage with a timer" in {

View file

@ -1,6 +1,7 @@
package docs.stream.cookbook
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.util.ByteString
@ -18,34 +19,49 @@ class RecipeByteStrings extends RecipeSpec {
//#bytestring-chunker
import akka.stream.stage._
class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
private var buffer = ByteString.empty
class Chunker(val chunkSize: Int) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("Chunker.in")
val out = Outlet[ByteString]("Chunker.out")
override val shape = FlowShape.of(in, out)
override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
buffer ++= elem
emitChunkOrPull(ctx)
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var buffer = ByteString.empty
override def onPull(ctx: Context[ByteString]): SyncDirective = emitChunkOrPull(ctx)
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) emitChunk()
else pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
emitChunk()
}
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective =
if (buffer.nonEmpty) ctx.absorbTermination()
else ctx.finish()
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) completeStage()
// elements left in buffer, keep accepting downstream pulls
// and push from buffer until buffer is emitted
}
})
private def emitChunkOrPull(ctx: Context[ByteString]): SyncDirective = {
if (buffer.isEmpty) {
if (ctx.isFinishing) ctx.finish()
else ctx.pull()
} else {
val (emit, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
ctx.push(emit)
private def emitChunk(): Unit = {
if (buffer.isEmpty) {
if (isClosed(in)) completeStage()
else pull(in)
} else {
val (chunk, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
push(out, chunk)
}
}
}
}
}
val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit))
val chunksStream = rawBytes.via(new Chunker(ChunkLimit))
//#bytestring-chunker
val chunksFuture = chunksStream.limit(10).runWith(Sink.seq)
@ -61,17 +77,31 @@ class RecipeByteStrings extends RecipeSpec {
//#bytes-limiter
import akka.stream.stage._
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
private var count = 0
class ByteLimiter(val maximumBytes: Long) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("ByteLimiter.in")
val out = Outlet[ByteString]("ByteLimiter.out")
override val shape = FlowShape.of(in, out)
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
count += chunk.size
if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
else ctx.push(chunk)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var count = 0
setHandlers(in, out, new InHandler with OutHandler {
override def onPull(): Unit = {
pull(in)
}
override def onPush(): Unit = {
val chunk = grab(in)
count += chunk.size
if (count > maximumBytes) failStage(new IllegalStateException("Too much bytes"))
else push(out, chunk)
}
})
}
}
val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit))
val limiter = Flow[ByteString].via(new ByteLimiter(SizeLimit))
//#bytes-limiter
val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))

View file

@ -3,6 +3,7 @@ package docs.stream.cookbook
import java.security.MessageDigest
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
@ -21,28 +22,36 @@ class RecipeDigest extends RecipeSpec {
//#calculating-digest
import akka.stream.stage._
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
val digest = MessageDigest.getInstance(algorithm)
class DigestCalculator(algorithm: String) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("DigestCalculator.in")
val out = Outlet[ByteString]("DigestCalculator.out")
override val shape = FlowShape.of(in, out)
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
digest.update(chunk.toArray)
ctx.pull()
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val digest = MessageDigest.getInstance(algorithm)
override def onPull(ctx: Context[ByteString]): SyncDirective = {
if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
else ctx.pull()
}
setHandler(out, new OutHandler {
override def onPull(): Trigger = {
pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Trigger = {
val chunk = grab(in)
digest.update(chunk.toArray)
pull(in)
}
override def onUpstreamFinish(): Unit = {
emit(out, ByteString(digest.digest()))
completeStage()
}
})
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
// If the stream is finished, we need to emit the last element in the onPull block.
// It is not allowed to directly emit elements from a termination block
// (onUpstreamFinish or onUpstreamFailure)
ctx.absorbTermination()
}
}
val digest: Source[ByteString, NotUsed] = data.transform(() => digestCalculator("SHA-256"))
val digest: Source[ByteString, NotUsed] = data.via(new DigestCalculator("SHA-256"))
//#calculating-digest
Await.result(digest.runWith(Sink.head), 3.seconds) should be(

View file

@ -1,5 +1,6 @@
package docs.stream.cookbook
import akka.stream.Attributes
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit._
@ -7,40 +8,68 @@ import scala.concurrent.duration._
object HoldOps {
//#hold-version-1
import akka.stream._
import akka.stream.stage._
class HoldWithInitial[T](initial: T) extends DetachedStage[T, T] {
private var currentValue: T = initial
final class HoldWithInitial[T](initial: T) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("HoldWithInitial.in")
val out = Outlet[T]("HoldWithInitial.out")
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
ctx.pull()
}
override val shape = FlowShape.of(in, out)
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
ctx.push(currentValue)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var currentValue: T = initial
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
currentValue = grab(in)
pull(in)
}
override def onPull(): Unit = {
push(out, currentValue)
}
})
override def preStart(): Unit = {
pull(in)
}
}
}
//#hold-version-1
//#hold-version-2
import akka.stream._
import akka.stream.stage._
class HoldWithWait[T] extends DetachedStage[T, T] {
private var currentValue: T = _
private var waitingFirstValue = true
final class HoldWithWait[T] extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("HoldWithWait.in")
val out = Outlet[T]("HoldWithWait.out")
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
waitingFirstValue = false
if (ctx.isHoldingDownstream) ctx.pushAndPull(currentValue)
else ctx.pull()
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var currentValue: T = _
private var waitingFirstValue = true
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
currentValue = grab(in)
if (waitingFirstValue) {
waitingFirstValue = false
if (isAvailable(out)) push(out, currentValue)
}
pull(in)
}
override def onPull(): Unit = {
if (!waitingFirstValue) push(out, currentValue)
}
})
override def preStart(): Unit = {
pull(in)
}
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (waitingFirstValue) ctx.holdDownstream()
else ctx.push(currentValue)
}
}
//#hold-version-2
}
@ -57,7 +86,9 @@ class RecipeHold extends RecipeSpec {
val source = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
source.transform(() => new HoldWithInitial(0)).to(sink).run()
source.via(new HoldWithInitial(0)).to(sink)
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
@ -87,7 +118,7 @@ class RecipeHold extends RecipeSpec {
val source = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
source.transform(() => new HoldWithWait).to(sink).run()
source.via(new HoldWithWait).to(sink).run()
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)

View file

@ -8,7 +8,6 @@ import java.util.concurrent.atomic.AtomicReference
import akka.stream._
import akka.stream.scaladsl.Tcp._
import akka.stream.scaladsl._
import akka.stream.stage.{ Context, PushStage, SyncDirective }
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.util.ByteString
@ -70,46 +69,29 @@ class StreamTcpDocSpec extends AkkaSpec {
import akka.stream.io.Framing
//#welcome-banner-chat-server
connections runForeach { connection =>
connections.runForeach { connection =>
val serverLogic = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// server logic, parses incoming commands
val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")
// server logic, parses incoming commands
val commandParser = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]): SyncDirective = {
elem match {
case "BYE" ctx.finish()
case _ ctx.push(elem + "!")
}
}
}
import connection._
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
val welcome = Source.single(welcomeMsg)
import connection._
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!\n"
val welcome = Source.single(ByteString(welcomeMsg))
val echo = b.add(Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
//#welcome-banner-chat-server
.map { command serverProbe.ref ! command; command }
//#welcome-banner-chat-server
.transform(() commandParser)
.map(_ + "\n")
.map(ByteString(_)))
val concat = b.add(Concat[ByteString]())
// first we emit the welcome message,
welcome ~> concat.in(0)
// then we continue using the echo-logic Flow
echo.outlet ~> concat.in(1)
FlowShape(echo.in, concat.out)
})
val serverLogic = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
//#welcome-banner-chat-server
.map { command serverProbe.ref ! command; command }
//#welcome-banner-chat-server
.via(commandParser)
// merge in the initial banner after parser
.merge(welcome)
.map(_ + "\n")
.map(ByteString(_))
connection.handleWith(serverLogic)
}
@ -135,14 +117,10 @@ class StreamTcpDocSpec extends AkkaSpec {
val connection = Tcp().outgoingConnection(localhost)
//#repl-client
val replParser = new PushStage[String, ByteString] {
override def onPush(elem: String, ctx: Context[ByteString]): SyncDirective = {
elem match {
case "q" ctx.pushAndFinish(ByteString("BYE\n"))
case _ ctx.push(ByteString(s"$elem\n"))
}
}
}
val replParser =
Flow[String].takeWhile(_ != "q")
.concat(Source.single("BYE"))
.map(elem => ByteString(s"$elem\n"))
val repl = Flow[ByteString]
.via(Framing.delimiter(
@ -152,7 +130,7 @@ class StreamTcpDocSpec extends AkkaSpec {
.map(_.utf8String)
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.transform(() replParser)
.via(replParser)
connection.join(repl).run()
}

View file

@ -213,7 +213,7 @@ through the stream starting from the stage which failed, all the way downstream
Connections Source failures
^^^^^^^^^^^^^^^^^^^^^^^^^^^
In the example below we add a custom ``PushStage`` (see :ref:`stream-customize-scala`) in order to react to the
In the example below we add a custom ``GraphStage`` (see :ref:`stream-customize-scala`) in order to react to the
stream's failure. We signal a ``failureMonitor`` actor with the cause why the stream is going down, and let the Actor
handle the rest maybe it'll decide to restart the server or shutdown the ActorSystem, that however is not our concern anymore.

View file

@ -75,17 +75,15 @@ Calculating the digest of a ByteString stream
**Situation:** A stream of bytes is given as a stream of ``ByteStrings`` and we want to calculate the cryptographic digest
of the stream.
This recipe uses a :class:`PushPullStage` to host a mutable :class:`MessageDigest` class (part of the Java Cryptography
This recipe uses a :class:`GraphStage` to host a mutable :class:`MessageDigest` class (part of the Java Cryptography
API) and update it with the bytes arriving from the stream. When the stream starts, the ``onPull`` handler of the
stage is called, which just bubbles up the ``pull`` event to its upstream. As a response to this pull, a ByteString
chunk will arrive (``onPush``) which we use to update the digest, then it will pull for the next chunk.
Eventually the stream of ``ByteStrings`` depletes and we get a notification about this event via ``onUpstreamFinish``.
At this point we want to emit the digest value, but we cannot do it in this handler directly. Instead we call
``ctx.absorbTermination()`` signalling to our context that we do not yet want to finish. When the environment decides that
we can emit further elements ``onPull`` is called again, and we see ``ctx.isFinishing`` returning ``true`` (since the upstream
source has been depleted already). Since we only want to emit a final element it is enough to call ``ctx.pushAndFinish``
passing the digest ByteString to be emitted.
At this point we want to emit the digest value, but we cannot do it with ``push`` in this handler directly since there may
be no downstream demand. Instead we call ``emit`` which will temporarily replace the handlers, emit the provided value when
demand comes in and then reset the stage state. It will then complete the stage.
.. includecode:: ../code/docs/stream/cookbook/RecipeDigest.scala#calculating-digest
@ -271,14 +269,11 @@ Create a stream processor that repeats the last element seen
of them is slowing down the other by dropping earlier unconsumed elements from the upstream if necessary, and repeating
the last value for the downstream if necessary.
We have two options to implement this feature. In both cases we will use :class:`DetachedStage` to build our custom
element (:class:`DetachedStage` is specifically designed for rate translating elements just like ``conflate``,
``expand`` or ``buffer``). In the first version we will use a provided initial value ``initial`` that will be used
We have two options to implement this feature. In both cases we will use :class:`GraphStage` to build our custom
element. In the first version we will use a provided initial value ``initial`` that will be used
to feed the downstream if no upstream element is ready yet. In the ``onPush()`` handler we just overwrite the
``currentValue`` variable and immediately relieve the upstream by calling ``pull()`` (remember, implementations of
:class:`DetachedStage` are not allowed to call ``push()`` as a response to ``onPush()`` or call ``pull()`` as a response
of ``onPull()``). The downstream ``onPull`` handler is very similar, we immediately relieve the downstream by
emitting ``currentValue``.
``currentValue`` variable and immediately relieve the upstream by calling ``pull()``. The downstream ``onPull`` handler
is very similar, we immediately relieve the downstream by emitting ``currentValue``.
.. includecode:: ../code/docs/stream/cookbook/RecipeHold.scala#hold-version-1
@ -289,9 +284,9 @@ case: if the very first element is not yet available.
We introduce a boolean variable ``waitingFirstValue`` to denote whether the first element has been provided or not
(alternatively an :class:`Option` can be used for ``currentValue`` or if the element type is a subclass of AnyRef
a null can be used with the same purpose). In the downstream ``onPull()`` handler the difference from the previous
version is that we call ``holdDownstream()`` if the first element is not yet available and thus blocking our downstream. The
upstream ``onPush()`` handler sets ``waitingFirstValue`` to false, and after checking if ``holdDownstream()`` has been called it
either relieves the upstream producer, or both the upstream producer and downstream consumer by calling ``pushAndPull()``
version is that we check if we have received the the first value and only emit if we have. This leads to that when the
first element comes in we must check if there possibly already was demand from downstream so that we in that case can
push the element directly.
.. includecode:: ../code/docs/stream/cookbook/RecipeHold.scala#hold-version-2
@ -336,14 +331,14 @@ Chunking up a stream of ByteStrings into limited size ByteStrings
the same sequence, but capping the size of ByteStrings. In other words we want to slice up ByteStrings into smaller
chunks if they exceed a size threshold.
This can be achieved with a single :class:`PushPullStage`. The main logic of our stage is in ``emitChunkOrPull()``
This can be achieved with a single :class:`GraphStage`. The main logic of our stage is in ``emitChunk()``
which implements the following logic:
* if the buffer is empty, we pull for more bytes
* if the buffer is empty, and upstream is not closed we pull for more bytes, if it is closed we complete
* if the buffer is nonEmpty, we split it according to the ``chunkSize``. This will give a next chunk that we will emit,
and an empty or nonempty remaining buffer.
Both ``onPush()`` and ``onPull()`` calls ``emitChunkOrPull()`` the only difference is that the push handler also stores
Both ``onPush()`` and ``onPull()`` calls ``emitChunk()`` the only difference is that the push handler also stores
the incoming chunk by appending to the end of the buffer.
.. includecode:: ../code/docs/stream/cookbook/RecipeByteStrings.scala#bytestring-chunker
@ -354,7 +349,7 @@ Limit the number of bytes passing through a stream of ByteStrings
**Situation:** Given a stream of ByteStrings we want to fail the stream if more than a given maximum of bytes has been
consumed.
This recipe uses a :class:`PushStage` to implement the desired feature. In the only handler we override,
This recipe uses a :class:`GraphStage` to implement the desired feature. In the only handler we override,
``onPush()`` we just update a counter and see if it gets larger than ``maximumBytes``. If a violation happens
we signal failure, otherwise we forward the chunk we have received.

Some files were not shown because too many files have changed in this diff Show more