Docs on fromSinkAndSource #25468 (#28349)

This commit is contained in:
Johan Andrén 2019-12-16 15:01:45 +01:00 committed by GitHub
parent 3a35851fef
commit 4df5376dcb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 224 additions and 18 deletions

View file

@ -0,0 +1,88 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.flow;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.japi.Pair;
import akka.stream.javadsl.*;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.util.ByteString;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
public class FromSinkAndSource {
void halfClosedTcpServer() {
ActorSystem system = null;
// #halfClosedTcpServer
// close in immediately
Sink<ByteString, NotUsed> sink = Sink.cancelled();
// periodic tick out
Source<ByteString, Cancellable> source =
Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick")
.map(tick -> ByteString.fromString(System.currentTimeMillis() + "\n"));
Flow<ByteString, ByteString, NotUsed> serverFlow = Flow.fromSinkAndSource(sink, source);
Source<Tcp.IncomingConnection, CompletionStage<Tcp.ServerBinding>> connectionStream =
Tcp.get(system).bind("127.0.0.1", 9999);
connectionStream.runForeach(
incomingConnection -> incomingConnection.handleWith(serverFlow, system), system);
// #halfClosedTcpServer
}
void chat() {
ActorSystem system = null;
// #chat
Pair<Sink<String, NotUsed>, Source<String, NotUsed>> pair =
MergeHub.of(String.class).toMat(BroadcastHub.of(String.class), Keep.both()).run(system);
Sink<String, NotUsed> sink = pair.first();
Source<String, NotUsed> source = pair.second();
Flow<ByteString, ByteString, NotUsed> framing =
Framing.delimiter(ByteString.fromString("\n"), 1024);
Sink<ByteString, NotUsed> sinkWithFraming =
framing.map(bytes -> bytes.utf8String()).to(pair.first());
Source<ByteString, NotUsed> sourceWithFraming =
source.map(text -> ByteString.fromString(text + "\n"));
Flow<ByteString, ByteString, NotUsed> serverFlow =
Flow.fromSinkAndSource(sinkWithFraming, sourceWithFraming);
Tcp.get(system)
.bind("127.0.0.1", 9999)
.runForeach(
incomingConnection -> incomingConnection.handleWith(serverFlow, system), system);
// #chat
}
<In, Out> void myApiThatTakesAFlow(Flow<In, Out, NotUsed> flow) {
throw new UnsupportedOperationException();
}
void testing() {
ActorSystem system = null;
// #testing
TestSubscriber.Probe<String> inProbe = TestSubscriber.probe(system);
TestPublisher.Probe<String> outProbe = TestPublisher.probe(0, system);
Flow<String, String, NotUsed> testFlow =
Flow.fromSinkAndSource(Sink.fromSubscriber(inProbe), Source.fromPublisher(outProbe));
myApiThatTakesAFlow(testFlow);
inProbe.expectNext("first");
outProbe.expectRequest();
outProbe.sendError(new RuntimeException("test error"));
// ...
// #testing
}
}

View file

@ -0,0 +1,77 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.flow
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.BroadcastHub
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Framing
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.MergeHub
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Tcp
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.scaladsl.TestSource
import akka.stream.testkit.scaladsl.TestSink
import akka.util.ByteString
import scala.concurrent.duration._
object FromSinkAndSource {
implicit val system: ActorSystem = ???
def halfClosedTcpServer(): Unit = {
// #halfClosedTcpServer
// close in immediately
val sink = Sink.cancelled[ByteString]
// periodic tick out
val source =
Source.tick(1.second, 1.second, "tick").map(_ => ByteString(System.currentTimeMillis().toString + "\n"))
val serverFlow = Flow.fromSinkAndSource(sink, source)
Tcp().bind("127.0.0.1", 9999).runForeach { incomingConnection =>
incomingConnection.handleWith(serverFlow)
}
// #halfClosedTcpServer
}
def chat(): Unit = {
// #chat
val (sink, source) = MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()
val framing = Framing.delimiter(ByteString("\n"), 1024)
val sinkWithFraming = framing.map(bytes => bytes.utf8String).to(sink)
val sourceWithFraming = source.map(text => ByteString(text + "\n"))
val serverFlow = Flow.fromSinkAndSource(sinkWithFraming, sourceWithFraming)
Tcp().bind("127.0.0.1", 9999).runForeach { incomingConnection =>
incomingConnection.handleWith(serverFlow)
}
// #chat
}
def testing(): Unit = {
def myApiThatTakesAFlow[In, Out](flow: Flow[In, Out, NotUsed]): Unit = ???
// #testing
val inProbe = TestSubscriber.probe[String]
val outProbe = TestPublisher.probe[String]()
val testFlow = Flow.fromSinkAndSource(Sink.fromSubscriber(inProbe), Source.fromPublisher(outProbe))
myApiThatTakesAFlow(testFlow)
inProbe.expectNext("first")
outProbe.expectRequest()
outProbe.sendError(new RuntimeException("test error"))
// ...
// #testing
}
}