2018-10-29 17:19:37 +08:00
|
|
|
/*
|
2018-03-13 23:45:55 +09:00
|
|
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
2018-01-22 01:16:36 +09:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2018-01-22 01:16:36 +09:00
|
|
|
package jdocs.stream;
|
|
|
|
|
|
|
|
|
|
import akka.NotUsed;
|
|
|
|
|
import akka.actor.AbstractActor;
|
|
|
|
|
import akka.actor.ActorRef;
|
|
|
|
|
import akka.actor.ActorSystem;
|
|
|
|
|
import akka.actor.Props;
|
2018-01-22 19:13:40 +09:00
|
|
|
import akka.pattern.PatternsCS;
|
2018-01-22 01:16:36 +09:00
|
|
|
import akka.stream.*;
|
|
|
|
|
import akka.stream.javadsl.*;
|
|
|
|
|
import akka.testkit.javadsl.TestKit;
|
|
|
|
|
import jdocs.AbstractJavaTest;
|
|
|
|
|
import org.junit.Test;
|
|
|
|
|
import scala.concurrent.duration.FiniteDuration;
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
2018-01-22 19:13:40 +09:00
|
|
|
import java.util.concurrent.CompletionStage;
|
2018-01-22 01:16:36 +09:00
|
|
|
|
|
|
|
|
public class FlowStreamRefsDocTest extends AbstractJavaTest {
|
|
|
|
|
|
|
|
|
|
static ActorSystem system = null;
|
|
|
|
|
static Materializer mat = null;
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void compileOnlySpec() {
|
|
|
|
|
// do nothing
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#offer-source
|
|
|
|
|
static class RequestLogs {
|
|
|
|
|
public final long streamId;
|
|
|
|
|
|
|
|
|
|
public RequestLogs(long streamId) {
|
|
|
|
|
this.streamId = streamId;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static class LogsOffer {
|
|
|
|
|
final SourceRef<String> sourceRef;
|
|
|
|
|
|
|
|
|
|
public LogsOffer(SourceRef<String> sourceRef) {
|
|
|
|
|
this.sourceRef = sourceRef;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static class DataSource extends AbstractActor {
|
|
|
|
|
@Override
|
|
|
|
|
public Receive createReceive() {
|
|
|
|
|
return receiveBuilder()
|
|
|
|
|
.match(RequestLogs.class, this::handleRequestLogs)
|
|
|
|
|
.build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void handleRequestLogs(RequestLogs requestLogs) {
|
|
|
|
|
Source<String, NotUsed> logs = streamLogs(requestLogs.streamId);
|
2018-01-22 19:13:40 +09:00
|
|
|
CompletionStage<SourceRef<String>> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
|
|
|
|
|
|
|
|
|
|
PatternsCS.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher())
|
|
|
|
|
.to(sender());
|
2018-01-22 01:16:36 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Source<String, NotUsed> streamLogs(long streamId) {
|
|
|
|
|
return Source.repeat("[INFO] some interesting logs here (for id: " + streamId + ")");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#offer-source
|
|
|
|
|
|
|
|
|
|
public void offerSource() {
|
|
|
|
|
new TestKit(system) {{
|
|
|
|
|
|
|
|
|
|
//#offer-source-use
|
|
|
|
|
ActorRef sourceActor = system.actorOf(Props.create(DataSource.class), "dataSource");
|
|
|
|
|
|
|
|
|
|
sourceActor.tell(new RequestLogs(1337), getTestActor());
|
|
|
|
|
LogsOffer offer = expectMsgClass(LogsOffer.class);
|
|
|
|
|
|
|
|
|
|
offer.sourceRef.getSource()
|
|
|
|
|
.runWith(Sink.foreach(log -> System.out.println(log)), mat);
|
|
|
|
|
|
|
|
|
|
//#offer-source-use
|
|
|
|
|
}};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#offer-sink
|
2018-01-22 19:13:40 +09:00
|
|
|
static class PrepareUpload {
|
2018-01-22 01:16:36 +09:00
|
|
|
final String id;
|
|
|
|
|
|
|
|
|
|
public PrepareUpload(String id) {
|
|
|
|
|
this.id = id;
|
|
|
|
|
}
|
|
|
|
|
}
|
2018-01-22 19:13:40 +09:00
|
|
|
static class MeasurementsSinkReady {
|
2018-01-22 01:16:36 +09:00
|
|
|
final String id;
|
|
|
|
|
final SinkRef<String> sinkRef;
|
|
|
|
|
|
2018-01-22 19:13:40 +09:00
|
|
|
public MeasurementsSinkReady(String id, SinkRef<String> ref) {
|
2018-01-22 01:16:36 +09:00
|
|
|
this.id = id;
|
|
|
|
|
this.sinkRef = ref;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static class DataReceiver extends AbstractActor {
|
|
|
|
|
@Override
|
|
|
|
|
public Receive createReceive() {
|
|
|
|
|
return receiveBuilder()
|
|
|
|
|
.match(PrepareUpload.class, prepare -> {
|
|
|
|
|
Sink<String, NotUsed> sink = logsSinkFor(prepare.id);
|
2018-01-22 19:13:40 +09:00
|
|
|
CompletionStage<SinkRef<String>> sinkRef = StreamRefs.<String>sinkRef().to(sink).run(mat);
|
2018-01-22 01:16:36 +09:00
|
|
|
|
2018-01-22 19:13:40 +09:00
|
|
|
PatternsCS.pipe(sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), context().dispatcher())
|
|
|
|
|
.to(sender());
|
2018-01-22 01:16:36 +09:00
|
|
|
})
|
2018-01-22 19:13:40 +09:00
|
|
|
.build();
|
2018-01-22 01:16:36 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Sink<String, NotUsed> logsSinkFor(String id) {
|
2018-01-22 19:13:40 +09:00
|
|
|
return Sink.<String>ignore().mapMaterializedValue(done -> NotUsed.getInstance());
|
2018-01-22 01:16:36 +09:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//#offer-sink
|
|
|
|
|
|
|
|
|
|
public void offerSink() {
|
|
|
|
|
new TestKit(system) {{
|
|
|
|
|
|
|
|
|
|
//#offer-sink-use
|
|
|
|
|
ActorRef receiver = system.actorOf(Props.create(DataReceiver.class), "dataReceiver");
|
|
|
|
|
|
|
|
|
|
receiver.tell(new PrepareUpload("system-42-tmp"), getTestActor());
|
2018-01-22 19:13:40 +09:00
|
|
|
MeasurementsSinkReady ready = expectMsgClass(MeasurementsSinkReady.class);
|
2018-01-22 01:16:36 +09:00
|
|
|
|
|
|
|
|
Source.repeat("hello")
|
2018-01-22 19:13:40 +09:00
|
|
|
.runWith(ready.sinkRef.getSink(), mat);
|
2018-01-22 01:16:36 +09:00
|
|
|
//#offer-sink-use
|
|
|
|
|
}};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void configureTimeouts() {
|
|
|
|
|
new TestKit(system) {{
|
|
|
|
|
|
|
|
|
|
//#attr-sub-timeout
|
|
|
|
|
FiniteDuration timeout = FiniteDuration.create(5, TimeUnit.SECONDS);
|
|
|
|
|
Attributes timeoutAttributes = StreamRefAttributes.subscriptionTimeout(timeout);
|
|
|
|
|
|
|
|
|
|
// configuring Sink.sourceRef (notice that we apply the attributes to the Sink!):
|
|
|
|
|
Source.repeat("hello")
|
2018-01-22 19:13:40 +09:00
|
|
|
.runWith(StreamRefs.<String>sourceRef().addAttributes(timeoutAttributes), mat);
|
2018-01-22 01:16:36 +09:00
|
|
|
|
|
|
|
|
// configuring SinkRef.source:
|
2018-01-22 19:13:40 +09:00
|
|
|
StreamRefs.<String>sinkRef().addAttributes(timeoutAttributes)
|
|
|
|
|
.runWith(Sink.<String>ignore(), mat); // not very interesting sink, just an example
|
2018-01-22 01:16:36 +09:00
|
|
|
|
|
|
|
|
//#attr-sub-timeout
|
|
|
|
|
}};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|