ActorContext.getParent for Java API #22413
This commit is contained in:
parent
5ff44194a3
commit
2eb226ed32
213 changed files with 1319 additions and 1523 deletions
|
|
@ -0,0 +1,271 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.stage.*;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.util.ByteString;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class RecipeByteStrings extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeByteStrings");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
|
||||
final Source<ByteString, NotUsed> rawBytes = Source.from(Arrays.asList(
|
||||
ByteString.fromArray(new byte[] { 1, 2 }),
|
||||
ByteString.fromArray(new byte[] { 3 }),
|
||||
ByteString.fromArray(new byte[] { 4, 5, 6 }),
|
||||
ByteString.fromArray(new byte[] { 7, 8, 9 })));
|
||||
|
||||
@Test
|
||||
public void chunker() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
final int CHUNK_LIMIT = 2;
|
||||
|
||||
//#bytestring-chunker
|
||||
class Chunker extends GraphStage<FlowShape<ByteString, ByteString>> {
|
||||
|
||||
private final int chunkSize;
|
||||
|
||||
public Inlet<ByteString> in = Inlet.<ByteString>create("Chunker.in");
|
||||
public Outlet<ByteString> out = Outlet.<ByteString>create("Chunker.out");
|
||||
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
|
||||
|
||||
public Chunker(int chunkSize) {
|
||||
this.chunkSize = chunkSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowShape<ByteString, ByteString> shape() {
|
||||
return shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
|
||||
return new GraphStageLogic(shape) {
|
||||
private ByteString buffer = ByteString.empty();
|
||||
|
||||
{
|
||||
setHandler(out, new AbstractOutHandler(){
|
||||
@Override
|
||||
public void onPull() throws Exception {
|
||||
if (isClosed(in)) emitChunk();
|
||||
else pull(in);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
setHandler(in, new AbstractInHandler() {
|
||||
|
||||
@Override
|
||||
public void onPush() throws Exception {
|
||||
ByteString elem = grab(in);
|
||||
buffer = buffer.concat(elem);
|
||||
emitChunk();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpstreamFinish() throws Exception {
|
||||
if (buffer.isEmpty()) completeStage();
|
||||
else {
|
||||
// There are elements left in buffer, so
|
||||
// we keep accepting downstream pulls and push from buffer until emptied.
|
||||
//
|
||||
// It might be though, that the upstream finished while it was pulled, in which
|
||||
// case we will not get an onPull from the downstream, because we already had one.
|
||||
// In that case we need to emit from the buffer.
|
||||
if (isAvailable(out)) emitChunk();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void emitChunk() {
|
||||
if (buffer.isEmpty()) {
|
||||
if (isClosed(in)) completeStage();
|
||||
else pull(in);
|
||||
} else {
|
||||
Tuple2<ByteString, ByteString> split = buffer.splitAt(chunkSize);
|
||||
ByteString chunk = split._1();
|
||||
buffer = split._2();
|
||||
push(out, chunk);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
//#bytestring-chunker
|
||||
|
||||
{
|
||||
//#bytestring-chunker2
|
||||
Source<ByteString, NotUsed> chunksStream =
|
||||
rawBytes.via(new Chunker(CHUNK_LIMIT));
|
||||
//#bytestring-chunker2
|
||||
|
||||
CompletionStage<List<ByteString>> chunksFuture = chunksStream.limit(10).runWith(Sink.seq(), mat);
|
||||
|
||||
List<ByteString> chunks = chunksFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
||||
for (ByteString chunk : chunks) {
|
||||
assertTrue(chunk.size() <= 2);
|
||||
}
|
||||
|
||||
ByteString sum = ByteString.empty();
|
||||
for (ByteString chunk : chunks) {
|
||||
sum = sum.concat(chunk);
|
||||
}
|
||||
assertEquals(sum, ByteString.fromArray(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void limiterShouldWork() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
final int SIZE_LIMIT = 9;
|
||||
|
||||
//#bytes-limiter
|
||||
class ByteLimiter extends GraphStage<FlowShape<ByteString, ByteString>> {
|
||||
|
||||
final long maximumBytes;
|
||||
|
||||
public Inlet<ByteString> in = Inlet.<ByteString>create("ByteLimiter.in");
|
||||
public Outlet<ByteString> out = Outlet.<ByteString>create("ByteLimiter.out");
|
||||
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
|
||||
|
||||
public ByteLimiter(long maximumBytes) {
|
||||
this.maximumBytes = maximumBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowShape<ByteString, ByteString> shape() {
|
||||
return shape;
|
||||
}
|
||||
@Override
|
||||
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
|
||||
return new GraphStageLogic(shape) {
|
||||
private int count = 0;
|
||||
|
||||
{
|
||||
setHandler(out, new AbstractOutHandler() {
|
||||
@Override
|
||||
public void onPull() throws Exception {
|
||||
pull(in);
|
||||
}
|
||||
});
|
||||
setHandler(in, new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() throws Exception {
|
||||
ByteString chunk = grab(in);
|
||||
count += chunk.size();
|
||||
if (count > maximumBytes) {
|
||||
failStage(new IllegalStateException("Too much bytes"));
|
||||
} else {
|
||||
push(out, chunk);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
//#bytes-limiter
|
||||
|
||||
{
|
||||
//#bytes-limiter2
|
||||
Flow<ByteString, ByteString, NotUsed> limiter =
|
||||
Flow.of(ByteString.class).via(new ByteLimiter(SIZE_LIMIT));
|
||||
//#bytes-limiter2
|
||||
|
||||
final Source<ByteString, NotUsed> bytes1 = Source.from(Arrays.asList(
|
||||
ByteString.fromArray(new byte[] { 1, 2 }),
|
||||
ByteString.fromArray(new byte[] { 3 }),
|
||||
ByteString.fromArray(new byte[] { 4, 5, 6 }),
|
||||
ByteString.fromArray(new byte[] { 7, 8, 9 })));
|
||||
|
||||
final Source<ByteString, NotUsed> bytes2 = Source.from(Arrays.asList(
|
||||
ByteString.fromArray(new byte[] { 1, 2 }),
|
||||
ByteString.fromArray(new byte[] { 3 }),
|
||||
ByteString.fromArray(new byte[] { 4, 5, 6 }),
|
||||
ByteString.fromArray(new byte[] { 7, 8, 9, 10 })));
|
||||
|
||||
List<ByteString> got = bytes1.via(limiter).limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
ByteString acc = ByteString.empty();
|
||||
for (ByteString b : got) {
|
||||
acc = acc.concat(b);
|
||||
}
|
||||
assertEquals(acc, ByteString.fromArray(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
|
||||
|
||||
boolean thrown = false;
|
||||
try {
|
||||
bytes2.via(limiter).limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
} catch (ExecutionException ex) {
|
||||
assertEquals(ex.getCause().getClass(), IllegalStateException.class);
|
||||
thrown = true;
|
||||
}
|
||||
assertTrue("Expected IllegalStateException to be thrown", thrown);
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void compacting() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<ByteString, NotUsed> rawBytes = Source.from(Arrays.asList(
|
||||
ByteString.fromArray(new byte[] { 1, 2 }),
|
||||
ByteString.fromArray(new byte[] { 3 }),
|
||||
ByteString.fromArray(new byte[] { 4, 5, 6 }),
|
||||
ByteString.fromArray(new byte[] { 7, 8, 9 })));
|
||||
|
||||
//#compacting-bytestrings
|
||||
Source<ByteString, NotUsed> compacted = rawBytes.map(ByteString::compact);
|
||||
//#compacting-bytestrings
|
||||
|
||||
List<ByteString> got = compacted.limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
||||
for (ByteString byteString : got) {
|
||||
assertTrue(byteString.isCompact());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Compression;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.util.ByteString;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
public class RecipeDecompress extends RecipeTest {
|
||||
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeDecompress");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
private ByteString gzip(final String s) throws IOException {
|
||||
final ByteArrayOutputStream buf = new ByteArrayOutputStream();
|
||||
final GZIPOutputStream out = new GZIPOutputStream(buf);
|
||||
try {
|
||||
out.write(s.getBytes(StandardCharsets.UTF_8));
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
return ByteString.fromArray(buf.toByteArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseLines() throws Exception {
|
||||
final Source<ByteString, NotUsed> compressed = Source.single(gzip("Hello World"));
|
||||
|
||||
//#decompress-gzip
|
||||
final Source<String, NotUsed> uncompressed = compressed
|
||||
.via(Compression.gunzip(100))
|
||||
.map(b -> b.utf8String());
|
||||
//#decompress-gzip
|
||||
|
||||
uncompressed.runWith(Sink.head(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,129 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.stage.*;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.util.ByteString;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class RecipeDigest extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeDigest");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
|
||||
//#calculating-digest
|
||||
class DigestCalculator extends GraphStage<FlowShape<ByteString, ByteString>> {
|
||||
private final String algorithm;
|
||||
public Inlet<ByteString> in = Inlet.<ByteString>create("DigestCalculator.in");
|
||||
public Outlet<ByteString> out = Outlet.<ByteString>create("DigestCalculator.out");
|
||||
private FlowShape<ByteString, ByteString> shape = FlowShape.of(in, out);
|
||||
|
||||
public DigestCalculator(String algorithm) {
|
||||
this.algorithm = algorithm;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowShape<ByteString, ByteString> shape() {
|
||||
return shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
|
||||
return new GraphStageLogic(shape) {
|
||||
final MessageDigest digest;
|
||||
|
||||
{
|
||||
try {
|
||||
digest = MessageDigest.getInstance(algorithm);
|
||||
} catch(NoSuchAlgorithmException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
setHandler(out, new AbstractOutHandler() {
|
||||
@Override
|
||||
public void onPull() throws Exception {
|
||||
pull(in);
|
||||
}
|
||||
});
|
||||
setHandler(in, new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() throws Exception {
|
||||
ByteString chunk = grab(in);
|
||||
digest.update(chunk.toArray());
|
||||
pull(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpstreamFinish() throws Exception {
|
||||
// If the stream is finished, we need to emit the digest
|
||||
// before completing
|
||||
emit(out, ByteString.fromArray(digest.digest()));
|
||||
completeStage();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
//#calculating-digest
|
||||
|
||||
@Test
|
||||
public void work() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
|
||||
{
|
||||
Source<ByteString, NotUsed> data = Source.from(Arrays.asList(
|
||||
ByteString.fromString("abcdbcdecdef"),
|
||||
ByteString.fromString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq")));
|
||||
|
||||
//#calculating-digest2
|
||||
final Source<ByteString, NotUsed> digest = data
|
||||
.via(new DigestCalculator("SHA-256"));
|
||||
//#calculating-digest2
|
||||
|
||||
ByteString got = digest.runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assertEquals(ByteString.fromInts(
|
||||
0x24, 0x8d, 0x6a, 0x61,
|
||||
0xd2, 0x06, 0x38, 0xb8,
|
||||
0xe5, 0xc0, 0x26, 0x93,
|
||||
0x0c, 0x3e, 0x60, 0x39,
|
||||
0xa3, 0x3c, 0xe4, 0x59,
|
||||
0x64, 0xff, 0x21, 0x67,
|
||||
0xf6, 0xec, 0xed, 0xd4,
|
||||
0x19, 0xdb, 0x06, 0xc1), got);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.Done;
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
||||
public class RecipeDroppyBroadcast extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeDroppyBroadcast");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void work() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
//#droppy-bcast
|
||||
// Makes a sink drop elements if too slow
|
||||
public <T> Sink<T, CompletionStage<Done>> droppySink(Sink<T, CompletionStage<Done>> sink, int size) {
|
||||
return Flow.<T> create()
|
||||
.buffer(size, OverflowStrategy.dropHead())
|
||||
.toMat(sink, Keep.right());
|
||||
}
|
||||
//#droppy-bcast
|
||||
|
||||
{
|
||||
final List<Integer> nums = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
nums.add(i + 1);
|
||||
}
|
||||
|
||||
final Sink<Integer, CompletionStage<Done>> mySink1 = Sink.ignore();
|
||||
final Sink<Integer, CompletionStage<Done>> mySink2 = Sink.ignore();
|
||||
final Sink<Integer, CompletionStage<Done>> mySink3 = Sink.ignore();
|
||||
|
||||
final Source<Integer, NotUsed> myData = Source.from(nums);
|
||||
|
||||
//#droppy-bcast2
|
||||
RunnableGraph.fromGraph(GraphDSL.create(builder -> {
|
||||
final int outputCount = 3;
|
||||
final UniformFanOutShape<Integer, Integer> bcast =
|
||||
builder.add(Broadcast.create(outputCount));
|
||||
builder.from(builder.add(myData)).toFanOut(bcast);
|
||||
builder.from(bcast).to(builder.add(droppySink(mySink1, 10)));
|
||||
builder.from(bcast).to(builder.add(droppySink(mySink2, 10)));
|
||||
builder.from(bcast).to(builder.add(droppySink(mySink3, 10)));
|
||||
return ClosedShape.getInstance();
|
||||
}));
|
||||
//#droppy-bcast2
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class RecipeFlattenList extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeFlattenList");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void workWithMapConcat() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
Source<List<Message>, NotUsed> someDataSource = Source
|
||||
.from(Arrays.asList(Arrays.asList(new Message("1")), Arrays.asList(new Message("2"), new Message("3"))));
|
||||
|
||||
//#flattening-lists
|
||||
Source<List<Message>, NotUsed> myData = someDataSource;
|
||||
Source<Message, NotUsed> flattened = myData.mapConcat(i -> i);
|
||||
//#flattening-lists
|
||||
|
||||
List<Message> got = flattened.limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
assertEquals(got.get(0), new Message("1"));
|
||||
assertEquals(got.get(1), new Message("2"));
|
||||
assertEquals(got.get(2), new Message("3"));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,235 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.*;
|
||||
import akka.pattern.PatternsCS;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.stream.testkit.TestSubscriber;
|
||||
import akka.stream.testkit.javadsl.TestSink;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.util.Timeout;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
|
||||
public class RecipeGlobalRateLimit extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeGlobalRateLimit");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
static
|
||||
//#global-limiter-actor
|
||||
public class Limiter extends AbstractActor {
|
||||
|
||||
public static class WantToPass {}
|
||||
public static final WantToPass WANT_TO_PASS = new WantToPass();
|
||||
|
||||
public static class MayPass {}
|
||||
public static final MayPass MAY_PASS = new MayPass();
|
||||
|
||||
public static class ReplenishTokens {}
|
||||
public static final ReplenishTokens REPLENISH_TOKENS = new ReplenishTokens();
|
||||
|
||||
private final int maxAvailableTokens;
|
||||
private final FiniteDuration tokenRefreshPeriod;
|
||||
private final int tokenRefreshAmount;
|
||||
|
||||
private final List<ActorRef> waitQueue = new ArrayList<>();
|
||||
private final Cancellable replenishTimer;
|
||||
|
||||
private int permitTokens;
|
||||
|
||||
public static Props props(int maxAvailableTokens, FiniteDuration tokenRefreshPeriod,
|
||||
int tokenRefreshAmount) {
|
||||
return Props.create(Limiter.class, maxAvailableTokens, tokenRefreshPeriod,
|
||||
tokenRefreshAmount);
|
||||
}
|
||||
|
||||
private Limiter(int maxAvailableTokens, FiniteDuration tokenRefreshPeriod,
|
||||
int tokenRefreshAmount) {
|
||||
this.maxAvailableTokens = maxAvailableTokens;
|
||||
this.tokenRefreshPeriod = tokenRefreshPeriod;
|
||||
this.tokenRefreshAmount = tokenRefreshAmount;
|
||||
this.permitTokens = maxAvailableTokens;
|
||||
|
||||
this.replenishTimer = system.scheduler().schedule(
|
||||
this.tokenRefreshPeriod,
|
||||
this.tokenRefreshPeriod,
|
||||
getSelf(),
|
||||
REPLENISH_TOKENS,
|
||||
getContext().getSystem().dispatcher(),
|
||||
getSelf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return open();
|
||||
}
|
||||
|
||||
private Receive open() {
|
||||
return receiveBuilder()
|
||||
.match(ReplenishTokens.class, rt -> {
|
||||
permitTokens = Math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens);
|
||||
})
|
||||
.match(WantToPass.class, wtp -> {
|
||||
permitTokens -= 1;
|
||||
getSender().tell(MAY_PASS, getSelf());
|
||||
if (permitTokens == 0) {
|
||||
getContext().become(closed());
|
||||
}
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
private Receive closed() {
|
||||
return receiveBuilder()
|
||||
.match(ReplenishTokens.class, rt -> {
|
||||
permitTokens = Math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens);
|
||||
releaseWaiting();
|
||||
})
|
||||
.match(WantToPass.class, wtp -> {
|
||||
waitQueue.add(sender());
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
private void releaseWaiting() {
|
||||
final List<ActorRef> toBeReleased = new ArrayList<>(permitTokens);
|
||||
for (Iterator<ActorRef> it = waitQueue.iterator(); permitTokens > 0 && it.hasNext();) {
|
||||
toBeReleased.add(it.next());
|
||||
it.remove();
|
||||
permitTokens --;
|
||||
}
|
||||
|
||||
toBeReleased.stream().forEach(ref -> ref.tell(MAY_PASS, getSelf()));
|
||||
if (permitTokens > 0) {
|
||||
getContext().become(open());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
replenishTimer.cancel();
|
||||
waitQueue.stream().forEach(ref -> {
|
||||
ref.tell(new Status.Failure(new IllegalStateException("limiter stopped")), getSelf());
|
||||
});
|
||||
}
|
||||
}
|
||||
//#global-limiter-actor
|
||||
|
||||
@Test
|
||||
public void work() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
//#global-limiter-flow
|
||||
public <T> Flow<T, T, NotUsed> limitGlobal(ActorRef limiter, FiniteDuration maxAllowedWait) {
|
||||
final int parallelism = 4;
|
||||
final Flow<T, T, NotUsed> f = Flow.create();
|
||||
|
||||
return f.mapAsync(parallelism, element -> {
|
||||
final Timeout triggerTimeout = new Timeout(maxAllowedWait);
|
||||
final CompletionStage<Object> limiterTriggerFuture =
|
||||
PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, triggerTimeout);
|
||||
return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher());
|
||||
});
|
||||
}
|
||||
//#global-limiter-flow
|
||||
|
||||
{
|
||||
// Use a large period and emulate the timer by hand instead
|
||||
ActorRef limiter = system.actorOf(Limiter.props(2, new FiniteDuration(100, TimeUnit.DAYS), 1), "limiter");
|
||||
|
||||
final Iterator<String> e1 = new Iterator<String>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
return "E1";
|
||||
}
|
||||
};
|
||||
final Iterator<String> e2 = new Iterator<String>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
return "E2";
|
||||
}
|
||||
};
|
||||
|
||||
final FiniteDuration twoSeconds = (FiniteDuration) dilated(Duration.create(2, TimeUnit.SECONDS));
|
||||
|
||||
final Sink<String, TestSubscriber.Probe<String>> sink = TestSink.probe(system);
|
||||
final TestSubscriber.Probe<String> probe =
|
||||
RunnableGraph.<TestSubscriber.Probe<String>>fromGraph(
|
||||
GraphDSL.create(sink, (builder, s) -> {
|
||||
final int inputPorts = 2;
|
||||
final UniformFanInShape<String, String> merge = builder.add(Merge.create(inputPorts));
|
||||
|
||||
final SourceShape<String> source1 =
|
||||
builder.add(Source.<String>fromIterator(() -> e1).via(limitGlobal(limiter, twoSeconds)));
|
||||
final SourceShape<String> source2 =
|
||||
builder.add(Source.<String>fromIterator(() -> e2).via(limitGlobal(limiter, twoSeconds)));
|
||||
|
||||
builder.from(source1).toFanIn(merge);
|
||||
builder.from(source2).toFanIn(merge);
|
||||
builder.from(merge).to(s);
|
||||
return ClosedShape.getInstance();
|
||||
})
|
||||
).run(mat);
|
||||
|
||||
probe.expectSubscription().request(1000);
|
||||
|
||||
FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertTrue(probe.expectNext().startsWith("E"));
|
||||
assertTrue(probe.expectNext().startsWith("E"));
|
||||
probe.expectNoMsg(fiveHundredMillis);
|
||||
|
||||
limiter.tell(Limiter.REPLENISH_TOKENS, getTestActor());
|
||||
assertTrue(probe.expectNext().startsWith("E"));
|
||||
probe.expectNoMsg(fiveHundredMillis);
|
||||
|
||||
final Set<String> resultSet = new HashSet<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
limiter.tell(Limiter.REPLENISH_TOKENS, getTestActor());
|
||||
resultSet.add(probe.expectNext());
|
||||
}
|
||||
|
||||
assertTrue(resultSet.contains("E1"));
|
||||
assertTrue(resultSet.contains("E2"));
|
||||
|
||||
probe.expectError();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,198 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.Keep;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.stage.*;
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.stream.testkit.TestSubscriber;
|
||||
import akka.stream.testkit.javadsl.TestSink;
|
||||
import akka.stream.testkit.javadsl.TestSource;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RecipeHold extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeHold");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
//#hold-version-1
|
||||
class HoldWithInitial<T> extends GraphStage<FlowShape<T, T>> {
|
||||
|
||||
public Inlet<T> in = Inlet.<T>create("HoldWithInitial.in");
|
||||
public Outlet<T> out = Outlet.<T>create("HoldWithInitial.out");
|
||||
private FlowShape<T, T> shape = FlowShape.of(in, out);
|
||||
|
||||
private final T initial;
|
||||
|
||||
public HoldWithInitial(T initial) {
|
||||
this.initial = initial;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowShape<T, T> shape() {
|
||||
return shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
|
||||
return new GraphStageLogic(shape) {
|
||||
private T currentValue = initial;
|
||||
|
||||
{
|
||||
setHandler(in, new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() throws Exception {
|
||||
currentValue = grab(in);
|
||||
pull(in);
|
||||
}
|
||||
});
|
||||
setHandler(out, new AbstractOutHandler() {
|
||||
@Override
|
||||
public void onPull() throws Exception {
|
||||
push(out, currentValue);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
pull(in);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
//#hold-version-1
|
||||
|
||||
//#hold-version-2
|
||||
class HoldWithWait<T> extends GraphStage<FlowShape<T, T>> {
|
||||
public Inlet<T> in = Inlet.<T>create("HoldWithInitial.in");
|
||||
public Outlet<T> out = Outlet.<T>create("HoldWithInitial.out");
|
||||
private FlowShape<T, T> shape = FlowShape.of(in, out);
|
||||
|
||||
@Override
|
||||
public FlowShape<T, T> shape() {
|
||||
return shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
|
||||
return new GraphStageLogic(shape) {
|
||||
private T currentValue = null;
|
||||
private boolean waitingFirstValue = true;
|
||||
|
||||
{
|
||||
setHandler(in, new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() throws Exception {
|
||||
currentValue = grab(in);
|
||||
if (waitingFirstValue) {
|
||||
waitingFirstValue = false;
|
||||
if (isAvailable(out)) push(out, currentValue);
|
||||
}
|
||||
pull(in);
|
||||
}
|
||||
});
|
||||
setHandler(out, new AbstractOutHandler() {
|
||||
@Override
|
||||
public void onPull() throws Exception {
|
||||
if (!waitingFirstValue) push(out, currentValue);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
pull(in);
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
//#hold-version-2
|
||||
|
||||
@Test
|
||||
public void workForVersion1() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<Integer, TestPublisher.Probe<Integer>> source = TestSource.probe(system);
|
||||
final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.probe(system);
|
||||
|
||||
Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubSub =
|
||||
source.via(new HoldWithInitial<>(0)).toMat(sink, Keep.both()).run(mat);
|
||||
TestPublisher.Probe<Integer> pub = pubSub.first();
|
||||
TestSubscriber.Probe<Integer> sub = pubSub.second();
|
||||
|
||||
sub.requestNext(0);
|
||||
sub.requestNext(0);
|
||||
|
||||
pub.sendNext(1);
|
||||
pub.sendNext(2);
|
||||
|
||||
sub.request(2);
|
||||
sub.expectNext(2, 2);
|
||||
|
||||
pub.sendComplete();
|
||||
sub.request(1);
|
||||
sub.expectComplete();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void workForVersion2() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<Integer, TestPublisher.Probe<Integer>> source = TestSource.probe(system);
|
||||
final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.probe(system);
|
||||
|
||||
Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubSub =
|
||||
source.via(new HoldWithWait<>()).toMat(sink, Keep.both()).run(mat);
|
||||
TestPublisher.Probe<Integer> pub = pubSub.first();
|
||||
TestSubscriber.Probe<Integer> sub = pubSub.second();
|
||||
|
||||
FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS);
|
||||
|
||||
sub.request(1);
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(1);
|
||||
sub.expectNext(1);
|
||||
|
||||
pub.sendNext(2);
|
||||
pub.sendNext(3);
|
||||
|
||||
sub.request(2);
|
||||
sub.expectNext(3, 3);
|
||||
|
||||
pub.sendComplete();
|
||||
sub.request(1);
|
||||
sub.expectComplete();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.util.ByteString;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RecipeKeepAlive extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeKeepAlive");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
class Tick {}
|
||||
public final Tick TICK = new Tick();
|
||||
|
||||
@Test
|
||||
public void workForVersion1() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final ByteString keepAliveMessage = ByteString.fromArray(new byte[]{11});
|
||||
|
||||
//@formatter:off
|
||||
//#inject-keepalive
|
||||
Flow<ByteString, ByteString, NotUsed> keepAliveInject =
|
||||
Flow.of(ByteString.class).keepAlive(
|
||||
scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS),
|
||||
() -> keepAliveMessage);
|
||||
//#inject-keepalive
|
||||
//@formatter:on
|
||||
|
||||
// Enough to compile, tested elsewhere as a built-in stage
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Attributes;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.testkit.DebugFilter;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import jdocs.stream.SilenceSystemOut;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.runtime.AbstractFunction0;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class RecipeLoggingElements extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeLoggingElements", ConfigFactory.parseString("akka.loglevel=DEBUG\nakka.loggers = [akka.testkit.TestEventListener]"));
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void workWithPrintln() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
final SilenceSystemOut.System System = SilenceSystemOut.get(getTestActor());
|
||||
|
||||
{
|
||||
final Source<String, NotUsed> mySource = Source.from(Arrays.asList("1", "2", "3"));
|
||||
|
||||
//#println-debug
|
||||
mySource.map(elem -> {
|
||||
System.out.println(elem);
|
||||
return elem;
|
||||
});
|
||||
//#println-debug
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void workWithLog() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
private <T> T analyse(T i) {
|
||||
return i;
|
||||
}
|
||||
|
||||
{
|
||||
final Source<String, NotUsed> mySource = Source.from(Arrays.asList("1", "2", "3"));
|
||||
|
||||
final int onElement = Logging.WarningLevel();
|
||||
final int onFinish = Logging.ErrorLevel();
|
||||
final int onFailure = Logging.ErrorLevel();
|
||||
|
||||
//#log-custom
|
||||
// customise log levels
|
||||
mySource.log("before-map")
|
||||
.withAttributes(Attributes.createLogLevels(onElement, onFinish, onFailure))
|
||||
.map(i -> analyse(i));
|
||||
|
||||
// or provide custom logging adapter
|
||||
final LoggingAdapter adapter = Logging.getLogger(system, "customLogger");
|
||||
mySource.log("custom", adapter);
|
||||
//#log-custom
|
||||
|
||||
|
||||
new DebugFilter("customLogger", "[custom] Element: ", false, false, 3).intercept(new AbstractFunction0<Object> () {
|
||||
public Void apply() {
|
||||
mySource.log("custom", adapter).runWith(Sink.ignore(), mat);
|
||||
return null;
|
||||
}
|
||||
}, system);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,154 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.stream.testkit.TestSubscriber;
|
||||
import akka.stream.testkit.javadsl.TestSink;
|
||||
import akka.stream.testkit.javadsl.TestSource;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RecipeManualTrigger extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeManualTrigger");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
class Trigger {
|
||||
}
|
||||
|
||||
public final Trigger TRIGGER = new Trigger();
|
||||
|
||||
@Test
|
||||
public void zipped() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<Trigger, TestPublisher.Probe<Trigger>> triggerSource = TestSource.probe(system);
|
||||
final Sink<Message, TestSubscriber.Probe<Message>> messageSink = TestSink.probe(system);
|
||||
|
||||
//#manually-triggered-stream
|
||||
final RunnableGraph<Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>>> g =
|
||||
RunnableGraph.<Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>>>fromGraph(
|
||||
GraphDSL.create(
|
||||
triggerSource,
|
||||
messageSink,
|
||||
(p, s) -> new Pair<>(p, s),
|
||||
(builder, source, sink) -> {
|
||||
SourceShape<Message> elements =
|
||||
builder.add(Source.from(Arrays.asList("1", "2", "3", "4")).map(t -> new Message(t)));
|
||||
FlowShape<Pair<Message, Trigger>, Message> takeMessage =
|
||||
builder.add(Flow.<Pair<Message, Trigger>>create().map(p -> p.first()));
|
||||
final FanInShape2<Message, Trigger, Pair<Message, Trigger>> zip =
|
||||
builder.add(Zip.create());
|
||||
builder.from(elements).toInlet(zip.in0());
|
||||
builder.from(source).toInlet(zip.in1());
|
||||
builder.from(zip.out()).via(takeMessage).to(sink);
|
||||
return ClosedShape.getInstance();
|
||||
}
|
||||
)
|
||||
);
|
||||
//#manually-triggered-stream
|
||||
|
||||
Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>> pubSub = g.run(mat);
|
||||
TestPublisher.Probe<Trigger> pub = pubSub.first();
|
||||
TestSubscriber.Probe<Message> sub = pubSub.second();
|
||||
|
||||
FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS);
|
||||
sub.expectSubscription().request(1000);
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(TRIGGER);
|
||||
sub.expectNext(new Message("1"));
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(TRIGGER);
|
||||
pub.sendNext(TRIGGER);
|
||||
sub.expectNext(new Message("2"));
|
||||
sub.expectNext(new Message("3"));
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(TRIGGER);
|
||||
sub.expectNext(new Message("4"));
|
||||
sub.expectComplete();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void zipWith() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<Trigger, TestPublisher.Probe<Trigger>> triggerSource = TestSource.probe(system);
|
||||
final Sink<Message, TestSubscriber.Probe<Message>> messageSink = TestSink.probe(system);
|
||||
|
||||
//#manually-triggered-stream-zipwith
|
||||
final RunnableGraph<Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>>> g =
|
||||
RunnableGraph.<Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>>>fromGraph(
|
||||
GraphDSL.create(
|
||||
triggerSource,
|
||||
messageSink,
|
||||
(p, s) -> new Pair<>(p, s),
|
||||
(builder, source, sink) -> {
|
||||
final SourceShape<Message> elements =
|
||||
builder.add(Source.from(Arrays.asList("1", "2", "3", "4")).map(t -> new Message(t)));
|
||||
final FanInShape2<Message, Trigger, Message> zipWith =
|
||||
builder.add(ZipWith.create((msg, trigger) -> msg));
|
||||
builder.from(elements).toInlet(zipWith.in0());
|
||||
builder.from(source).toInlet(zipWith.in1());
|
||||
builder.from(zipWith.out()).to(sink);
|
||||
return ClosedShape.getInstance();
|
||||
}
|
||||
)
|
||||
);
|
||||
//#manually-triggered-stream-zipwith
|
||||
|
||||
Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>> pubSub = g.run(mat);
|
||||
TestPublisher.Probe<Trigger> pub = pubSub.first();
|
||||
TestSubscriber.Probe<Message> sub = pubSub.second();
|
||||
|
||||
FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS);
|
||||
sub.expectSubscription().request(1000);
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(TRIGGER);
|
||||
sub.expectNext(new Message("1"));
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(TRIGGER);
|
||||
pub.sendNext(TRIGGER);
|
||||
sub.expectNext(new Message("2"));
|
||||
sub.expectNext(new Message("3"));
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(TRIGGER);
|
||||
sub.expectNext(new Message("4"));
|
||||
sub.expectComplete();
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,98 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.javadsl.Keep;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.stream.testkit.TestSubscriber;
|
||||
import akka.stream.testkit.javadsl.TestSink;
|
||||
import akka.stream.testkit.javadsl.TestSource;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.testkit.TestLatch;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RecipeMissedTicks extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeMissedTicks");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void work() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
class Tick {
|
||||
}
|
||||
|
||||
final Tick Tick = new Tick();
|
||||
|
||||
{
|
||||
final Source<Tick, TestPublisher.Probe<Tick>> tickStream = TestSource.probe(system);
|
||||
final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.probe(system);
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
//#missed-ticks
|
||||
final Flow<Tick, Integer, NotUsed> missedTicks =
|
||||
Flow.of(Tick.class).conflateWithSeed(tick -> 0, (missed, tick) -> missed + 1);
|
||||
//#missed-ticks
|
||||
final TestLatch latch = new TestLatch(3, system);
|
||||
final Flow<Tick, Integer, NotUsed> realMissedTicks =
|
||||
Flow.of(Tick.class).conflateWithSeed(tick -> 0, (missed, tick) -> { latch.countDown(); return missed + 1; });
|
||||
|
||||
Pair<TestPublisher.Probe<Tick>, TestSubscriber.Probe<Integer>> pubSub =
|
||||
tickStream.via(realMissedTicks).toMat(sink, Keep.both()).run(mat);
|
||||
TestPublisher.Probe<Tick> pub = pubSub.first();
|
||||
TestSubscriber.Probe<Integer> sub = pubSub.second();
|
||||
|
||||
pub.sendNext(Tick);
|
||||
pub.sendNext(Tick);
|
||||
pub.sendNext(Tick);
|
||||
pub.sendNext(Tick);
|
||||
|
||||
FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS);
|
||||
|
||||
Await.ready(latch, Duration.create(1, TimeUnit.SECONDS));
|
||||
|
||||
sub.request(1);
|
||||
sub.expectNext(3);
|
||||
sub.request(1);
|
||||
sub.expectNoMsg(timeout);
|
||||
|
||||
pub.sendNext(Tick);
|
||||
sub.expectNext(0);
|
||||
|
||||
pub.sendComplete();
|
||||
sub.request(1);
|
||||
sub.expectComplete();
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,153 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Function;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.stream.javadsl.SubSource;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
|
||||
public class RecipeMultiGroupByTest extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeMultiGroupBy");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
static class Topic {
|
||||
private final String name;
|
||||
|
||||
public Topic(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Topic topic = (Topic) o;
|
||||
|
||||
if (name != null ? !name.equals(topic.name) : topic.name != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return name != null ? name.hashCode() : 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void work() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
final List<Topic> extractTopics(Message m) {
|
||||
final List<Topic> topics = new ArrayList<>(2);
|
||||
|
||||
if (m.msg.startsWith("1")) {
|
||||
topics.add(new Topic("1"));
|
||||
} else {
|
||||
topics.add(new Topic("1"));
|
||||
topics.add(new Topic("2"));
|
||||
}
|
||||
|
||||
return topics;
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
final Source<Message, NotUsed> elems = Source
|
||||
.from(Arrays.asList("1: a", "1: b", "all: c", "all: d", "1: e"))
|
||||
.map(s -> new Message(s));
|
||||
|
||||
//#multi-groupby
|
||||
final Function<Message, List<Topic>> topicMapper = m -> extractTopics(m);
|
||||
|
||||
final Source<Pair<Message, Topic>, NotUsed> messageAndTopic = elems
|
||||
.mapConcat((Message msg) -> {
|
||||
List<Topic> topicsForMessage = topicMapper.apply(msg);
|
||||
// Create a (Msg, Topic) pair for each of the topics
|
||||
|
||||
// the message belongs to
|
||||
return topicsForMessage
|
||||
.stream()
|
||||
.map(topic -> new Pair<Message, Topic>(msg, topic))
|
||||
.collect(toList());
|
||||
});
|
||||
|
||||
SubSource<Pair<Message, Topic>, NotUsed> multiGroups = messageAndTopic
|
||||
.groupBy(2, pair -> pair.second())
|
||||
.map(pair -> {
|
||||
Message message = pair.first();
|
||||
Topic topic = pair.second();
|
||||
|
||||
// do what needs to be done
|
||||
//#multi-groupby
|
||||
return pair;
|
||||
//#multi-groupby
|
||||
});
|
||||
//#multi-groupby
|
||||
|
||||
CompletionStage<List<String>> result = multiGroups
|
||||
.grouped(10)
|
||||
.mergeSubstreams()
|
||||
.map(pair -> {
|
||||
Topic topic = pair.get(0).second();
|
||||
return topic.name + mkString(pair.stream().map(p -> p.first().msg).collect(toList()), "[", ", ", "]");
|
||||
})
|
||||
.grouped(10)
|
||||
.runWith(Sink.head(), mat);
|
||||
|
||||
List<String> got = result.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assertTrue(got.contains("1[1: a, 1: b, all: c, all: d, 1: e]"));
|
||||
assertTrue(got.contains("2[all: c, all: d]"));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static final String mkString(List<String> l, String start, String separate, String end) {
|
||||
StringBuilder sb = new StringBuilder(start);
|
||||
for (String s : l) {
|
||||
sb.append(s).append(separate);
|
||||
}
|
||||
return sb
|
||||
.delete(sb.length() - separate.length(), sb.length())
|
||||
.append(end).toString();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Framing;
|
||||
import akka.stream.javadsl.FramingTruncation;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.util.ByteString;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RecipeParseLines extends RecipeTest {
|
||||
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeParseLines");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseLines() throws Exception {
|
||||
final Source<ByteString, NotUsed> rawData = Source.from(Arrays.asList(
|
||||
ByteString.fromString("Hello World"),
|
||||
ByteString.fromString("\r"),
|
||||
ByteString.fromString("!\r"),
|
||||
ByteString.fromString("\nHello Akka!\r\nHello Streams!"),
|
||||
ByteString.fromString("\r\n\r\n")));
|
||||
|
||||
//#parse-lines
|
||||
final Source<String, NotUsed> lines = rawData
|
||||
.via(Framing.delimiter(ByteString.fromString("\r\n"), 100, FramingTruncation.ALLOW))
|
||||
.map(b -> b.utf8String());
|
||||
//#parse-lines
|
||||
lines.limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,122 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.function.Function;
|
||||
import akka.japi.function.Function2;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RecipeReduceByKeyTest extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeReduceByKey");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void work() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "world", "and", "hello", "akka"));
|
||||
|
||||
//#word-count
|
||||
final int MAXIMUM_DISTINCT_WORDS = 1000;
|
||||
|
||||
final Source<Pair<String, Integer>, NotUsed> counts = words
|
||||
// split the words into separate streams first
|
||||
.groupBy(MAXIMUM_DISTINCT_WORDS, i -> i)
|
||||
//transform each element to pair with number of words in it
|
||||
.map(i -> new Pair<>(i, 1))
|
||||
// add counting logic to the streams
|
||||
.reduce((left, right) -> new Pair<>(left.first(), left.second() + right.second()))
|
||||
// get a stream of word counts
|
||||
.mergeSubstreams();
|
||||
//#word-count
|
||||
|
||||
final CompletionStage<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat);
|
||||
final Set<Pair<String, Integer>> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS).stream().collect(Collectors.toSet());
|
||||
final Set<Pair<String, Integer>> expected = new HashSet<>();
|
||||
expected.add(new Pair<>("hello", 2));
|
||||
expected.add(new Pair<>("world", 1));
|
||||
expected.add(new Pair<>("and", 1));
|
||||
expected.add(new Pair<>("akka", 1));
|
||||
Assert.assertEquals(expected, result);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
//#reduce-by-key-general
|
||||
static public <In, K, Out> Flow<In, Pair<K, Out>, NotUsed> reduceByKey(
|
||||
int maximumGroupSize,
|
||||
Function<In, K> groupKey,
|
||||
Function<In, Out> map,
|
||||
Function2<Out, Out, Out> reduce) {
|
||||
|
||||
return Flow.<In> create()
|
||||
.groupBy(maximumGroupSize, groupKey)
|
||||
.map(i -> new Pair<>(groupKey.apply(i), map.apply(i)))
|
||||
.reduce((left, right) -> new Pair<>(left.first(), reduce.apply(left.second(), right.second())))
|
||||
.mergeSubstreams();
|
||||
}
|
||||
//#reduce-by-key-general
|
||||
|
||||
@Test
|
||||
public void workGeneralised() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<String, NotUsed> words = Source.from(Arrays.asList("hello", "world", "and", "hello", "akka"));
|
||||
|
||||
//#reduce-by-key-general2
|
||||
final int MAXIMUM_DISTINCT_WORDS = 1000;
|
||||
|
||||
Source<Pair<String, Integer>, NotUsed> counts = words.via(reduceByKey(
|
||||
MAXIMUM_DISTINCT_WORDS,
|
||||
word -> word,
|
||||
word -> 1,
|
||||
(left, right) -> left + right));
|
||||
|
||||
//#reduce-by-key-general2
|
||||
final CompletionStage<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat);
|
||||
final Set<Pair<String, Integer>> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS).stream().collect(Collectors.toSet());
|
||||
final Set<Pair<String, Integer>> expected = new HashSet<>();
|
||||
expected.add(new Pair<>("hello", 2));
|
||||
expected.add(new Pair<>("world", 1));
|
||||
expected.add(new Pair<>("and", 1));
|
||||
expected.add(new Pair<>("akka", 1));
|
||||
Assert.assertEquals(expected, result);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RecipeSeq extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeLoggingElements");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void drainSourceToList() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<String, NotUsed> mySource = Source.from(Arrays.asList("1", "2", "3"));
|
||||
//#draining-to-list-unsafe
|
||||
// Dangerous: might produce a collection with 2 billion elements!
|
||||
final CompletionStage<List<String>> strings = mySource.runWith(Sink.seq(), mat);
|
||||
//#draining-to-list-unsafe
|
||||
|
||||
strings.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void drainSourceToListWithLimit() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<String, NotUsed> mySource = Source.from(Arrays.asList("1", "2", "3"));
|
||||
//#draining-to-list-safe
|
||||
final int MAX_ALLOWED_SIZE = 100;
|
||||
|
||||
// OK. Future will fail with a `StreamLimitReachedException`
|
||||
// if the number of incoming elements is larger than max
|
||||
final CompletionStage<List<String>> strings =
|
||||
mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);
|
||||
//#draining-to-list-safe
|
||||
|
||||
strings.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void drainSourceToListWithTake() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
final Source<String, NotUsed> mySource = Source.from(Arrays.asList("1", "2", "3"));
|
||||
final int MAX_ALLOWED_SIZE = 100;
|
||||
|
||||
//#draining-to-list-safe
|
||||
|
||||
// OK. Collect up until max-th elements only, then cancel upstream
|
||||
final CompletionStage<List<String>> strings =
|
||||
mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);
|
||||
//#draining-to-list-safe
|
||||
|
||||
strings.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.stream.testkit.TestSubscriber;
|
||||
import akka.stream.testkit.javadsl.TestSink;
|
||||
import akka.stream.testkit.javadsl.TestSource;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.testkit.TestLatch;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RecipeSimpleDrop extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeSimpleDrop");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void work() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
@SuppressWarnings("unused")
|
||||
//#simple-drop
|
||||
final Flow<Message, Message, NotUsed> droppyStream =
|
||||
Flow.of(Message.class).conflate((lastMessage, newMessage) -> newMessage);
|
||||
//#simple-drop
|
||||
final TestLatch latch = new TestLatch(2, system);
|
||||
final Flow<Message, Message, NotUsed> realDroppyStream =
|
||||
Flow.of(Message.class).conflate((lastMessage, newMessage) -> { latch.countDown(); return newMessage; });
|
||||
|
||||
final Pair<TestPublisher.Probe<Message>, TestSubscriber.Probe<Message>> pubSub = TestSource
|
||||
.<Message> probe(system)
|
||||
.via(realDroppyStream)
|
||||
.toMat(TestSink.probe(system),
|
||||
(pub, sub) -> new Pair<>(pub, sub))
|
||||
.run(mat);
|
||||
final TestPublisher.Probe<Message> pub = pubSub.first();
|
||||
final TestSubscriber.Probe<Message> sub = pubSub.second();
|
||||
|
||||
pub.sendNext(new Message("1"));
|
||||
pub.sendNext(new Message("2"));
|
||||
pub.sendNext(new Message("3"));
|
||||
|
||||
Await.ready(latch, Duration.create(1, TimeUnit.SECONDS));
|
||||
|
||||
sub.requestNext(new Message("3"));
|
||||
|
||||
pub.sendComplete();
|
||||
sub.request(1);
|
||||
sub.expectComplete();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import jdocs.AbstractJavaTest;
|
||||
|
||||
public abstract class RecipeTest extends AbstractJavaTest {
|
||||
final class Message {
|
||||
public final String msg;
|
||||
|
||||
public Message(String msg) {
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Message message = (Message) o;
|
||||
|
||||
if (msg != null ? !msg.equals(message.msg) : message.msg != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return msg != null ? msg.hashCode() : 0;
|
||||
}
|
||||
}
|
||||
|
||||
final class Trigger {
|
||||
}
|
||||
|
||||
final class Job {
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package jdocs.stream.javadsl.cookbook;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class RecipeWorkerPool extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("RecipeWorkerPool");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
//#worker-pool
|
||||
public static <In, Out> Flow<In, Out, NotUsed> balancer(
|
||||
Flow<In, Out, NotUsed> worker, int workerCount) {
|
||||
return Flow.fromGraph(GraphDSL.create(b -> {
|
||||
boolean waitForAllDownstreams = true;
|
||||
final UniformFanOutShape<In, In> balance =
|
||||
b.add(Balance.<In>create(workerCount, waitForAllDownstreams));
|
||||
final UniformFanInShape<Out, Out> merge =
|
||||
b.add(Merge.<Out>create(workerCount));
|
||||
|
||||
for (int i = 0; i < workerCount; i++) {
|
||||
b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i));
|
||||
}
|
||||
|
||||
return FlowShape.of(balance.in(), merge.out());
|
||||
}));
|
||||
}
|
||||
//#worker-pool
|
||||
|
||||
@Test
|
||||
public void workForVersion1() throws Exception {
|
||||
new JavaTestKit(system) {
|
||||
{
|
||||
Source<Message, NotUsed> data =
|
||||
Source
|
||||
.from(Arrays.asList("1", "2", "3", "4", "5"))
|
||||
.map(t -> new Message(t));
|
||||
|
||||
Flow<Message, Message, NotUsed> worker = Flow.of(Message.class).map(m -> new Message(m.msg + " done"));
|
||||
|
||||
//#worker-pool2
|
||||
Flow<Message, Message, NotUsed> balancer = balancer(worker, 3);
|
||||
Source<Message, NotUsed> processedJobs = data.via(balancer);
|
||||
//#worker-pool2
|
||||
|
||||
FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS);
|
||||
CompletionStage<List<String>> future = processedJobs.map(m -> m.msg).limit(10).runWith(Sink.seq(), mat);
|
||||
List<String> got = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
assertTrue(got.contains("1 done"));
|
||||
assertTrue(got.contains("2 done"));
|
||||
assertTrue(got.contains("3 done"));
|
||||
assertTrue(got.contains("4 done"));
|
||||
assertTrue(got.contains("5 done"));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue