!str #16937 Remove section, in favor of via and separate flow

This commit is contained in:
Patrik Nordwall 2015-04-09 15:16:59 +02:00
parent bb0bed7d3c
commit b69988c190
36 changed files with 190 additions and 271 deletions

View file

@ -37,7 +37,7 @@ elements that cause the division by zero are effectively dropped.
Be aware that dropping elements may result in deadlocks in graphs with Be aware that dropping elements may result in deadlocks in graphs with
cycles, as explained in :ref:`graph-cycles-java`. cycles, as explained in :ref:`graph-cycles-java`.
The supervision strategy can also be defined for a section of flow operators. The supervision strategy can also be defined for all operators of a flow.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#resume-section .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#resume-section

View file

@ -26,7 +26,7 @@ which will be responsible for materializing and running the streams we are about
The :class:`ActorFlowMaterializer` can optionally take :class:`ActorFlowMaterializerSettings` which can be used to define The :class:`ActorFlowMaterializer` can optionally take :class:`ActorFlowMaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-java`), the dispatcher to materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-java`), the dispatcher to
be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this be used by the pipeline etc. These can be overridden on a flow, source and sink, but this
will be discussed in depth in :ref:`stream-section-configuration`. will be discussed in depth in :ref:`stream-section-configuration`.
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source<Out>`: Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source<Out>`:

View file

@ -66,7 +66,8 @@ Alternatively they can be set by passing a :class:`ActorFlowMaterializerSettings
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#materializer-buffer .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#materializer-buffer
If buffer size needs to be set for segments of a Flow only, it is possible by defining a ``section()``: If buffer size needs to be set for segments of a :class:`Flow` only, it is possible by defining a separate
:class:`Flow` with these attributes:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#section-buffer .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#section-buffer

View file

@ -50,9 +50,11 @@ class FlowErrorDocSpec extends AkkaSpec {
case _: ArithmeticException => Supervision.Resume case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop case _ => Supervision.Stop
} }
val source = Source(0 to 5).section(OperationAttributes.supervisionStrategy(decider)) { val flow = Flow[Int]
_.filter(100 / _ < 50).map(elem => 100 / (5 - elem)) .filter(100 / _ < 50).map(elem => 100 / (5 - elem))
} .withAttributes(OperationAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)
val result = source.runWith(Sink.fold(0)(_ + _)) val result = source.runWith(Sink.fold(0)(_ + _))
// the elements causing division by zero will be dropped // the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150) // result here will be a Future completed with Success(150)
@ -68,13 +70,13 @@ class FlowErrorDocSpec extends AkkaSpec {
case _: IllegalArgumentException => Supervision.Restart case _: IllegalArgumentException => Supervision.Restart
case _ => Supervision.Stop case _ => Supervision.Stop
} }
val source = Source(List(1, 3, -1, 5, 7)).section( val flow = Flow[Int]
OperationAttributes.supervisionStrategy(decider)) { .scan(0) { (acc, elem) =>
_.scan(0) { (acc, elem) => if (elem < 0) throw new IllegalArgumentException("negative not allowed")
if (elem < 0) throw new IllegalArgumentException("negative not allowed") else acc + elem
else acc + elem
}
} }
.withAttributes(OperationAttributes.supervisionStrategy(decider))
val source = Source(List(1, 3, -1, 5, 7)).via(flow)
val result = source.grouped(1000).runWith(Sink.head) val result = source.grouped(1000).runWith(Sink.head)
// the negative element cause the scan stage to be restarted, // the negative element cause the scan stage to be restarted,
// i.e. start from 0 again // i.e. start from 0 again

View file

@ -19,6 +19,7 @@ import scala.concurrent.ExecutionContext
import akka.stream.ActorFlowMaterializerSettings import akka.stream.ActorFlowMaterializerSettings
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.stream.Supervision import akka.stream.Supervision
import akka.stream.scaladsl.Flow
object IntegrationDocSpec { object IntegrationDocSpec {
import TwitterStreamQuickstartDocSpec._ import TwitterStreamQuickstartDocSpec._
@ -173,9 +174,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
import Supervision.resumingDecider import Supervision.resumingDecider
val emailAddresses: Source[String, Unit] = val emailAddresses: Source[String, Unit] =
authors.section(supervisionStrategy(resumingDecider)) { authors.via(
_.mapAsync(4, author => addressSystem.lookupEmail(author.handle)) Flow[Author].mapAsync(4, author => addressSystem.lookupEmail(author.handle))
} .withAttributes(supervisionStrategy(resumingDecider)))
//#email-addresses-mapAsync-supervision //#email-addresses-mapAsync-supervision
} }
@ -263,15 +264,13 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
.collect { case Some(phoneNo) => phoneNo } .collect { case Some(phoneNo) => phoneNo }
//#blocking-map //#blocking-map
val send = Flow[String]
.map { phoneNo =>
smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))
}
.withAttributes(OperationAttributes.dispatcher("blocking-dispatcher"))
val sendTextMessages: RunnableFlow[Unit] = val sendTextMessages: RunnableFlow[Unit] =
phoneNumbers phoneNumbers.via(send).to(Sink.ignore)
.section(OperationAttributes.dispatcher("blocking-dispatcher")) {
_.map { phoneNo =>
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
}
}
.to(Sink.ignore)
sendTextMessages.run() sendTextMessages.run()
//#blocking-map //#blocking-map

View file

@ -28,13 +28,9 @@ class StreamBuffersRateSpec extends AkkaSpec {
//#materializer-buffer //#materializer-buffer
//#section-buffer //#section-buffer
val flow = val section = Flow[Int].map(_ * 2)
Flow[Int] .withAttributes(OperationAttributes.inputBuffer(initial = 1, max = 1))
.section(OperationAttributes.inputBuffer(initial = 1, max = 1)) { sectionFlow => val flow = section.via(Flow[Int].map(_ / 2)) // the buffer size of this map is the default
// the buffer size of this map is 1
sectionFlow.map(_ * 2)
}
.map(_ / 2) // the buffer size of this map is the default
//#section-buffer //#section-buffer
} }

View file

@ -37,7 +37,7 @@ elements that cause the division by zero are effectively dropped.
Be aware that dropping elements may result in deadlocks in graphs with Be aware that dropping elements may result in deadlocks in graphs with
cycles, as explained in :ref:`graph-cycles-scala`. cycles, as explained in :ref:`graph-cycles-scala`.
The supervision strategy can also be defined for a section of flow operators. The supervision strategy can also be defined for all operators of a flow.
.. includecode:: code/docs/stream/FlowErrorDocSpec.scala#resume-section .. includecode:: code/docs/stream/FlowErrorDocSpec.scala#resume-section

View file

@ -66,7 +66,8 @@ Alternatively they can be set by passing a :class:`ActorFlowMaterializerSettings
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#materializer-buffer .. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#materializer-buffer
If buffer size needs to be set for segments of a Flow only, it is possible by defining a ``section()``: If buffer size needs to be set for segments of a :class:`Flow` only, it is possible by defining a separate
:class:`Flow` with these attributes:
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#section-buffer .. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#section-buffer

View file

@ -60,7 +60,7 @@ private[http] object OutgoingConnectionBlueprint {
val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest] val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest]
.map(RequestRenderingContext(_, remoteAddress)) .map(RequestRenderingContext(_, remoteAddress))
.section(name("renderer"))(_.transform(() requestRendererFactory.newRenderer)) .via(Flow[RequestRenderingContext].transform(() requestRendererFactory.newRenderer).named("renderer"))
.flatten(FlattenStrategy.concat) .flatten(FlattenStrategy.concat)
val methodBypass = Flow[HttpRequest].map(_.method) val methodBypass = Flow[HttpRequest].map(_.method)
@ -85,8 +85,8 @@ private[http] object OutgoingConnectionBlueprint {
val terminationFanout = b.add(Broadcast[HttpResponse](2)) val terminationFanout = b.add(Broadcast[HttpResponse](2))
val terminationMerge = b.add(new TerminationMerge) val terminationMerge = b.add(new TerminationMerge)
val bytesOut = (terminationMerge.out ~> val logger = Flow[ByteString].transform(() errorLogger(log, "Outgoing request stream error")).named("errorLogger")
requestRendering.section(name("errorLogger"))(_.transform(() errorLogger(log, "Outgoing request stream error")))).outlet val bytesOut = (terminationMerge.out ~> requestRendering.via(logger)).outlet
val bytesIn = responseParsingMerge.in0 val bytesIn = responseParsingMerge.in0

View file

@ -9,6 +9,7 @@ import scala.annotation.tailrec
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.stream.scaladsl.OperationAttributes._ import akka.stream.scaladsl.OperationAttributes._
import akka.stream.stage.{ Context, PushPullStage } import akka.stream.stage.{ Context, PushPullStage }
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{ Keep, Source } import akka.stream.scaladsl.{ Keep, Source }
import akka.util.ByteString import akka.util.ByteString
import akka.http.model.parser.CharacterClasses import akka.http.model.parser.CharacterClasses
@ -128,7 +129,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
def expect100continueHandling[T, Mat]: Source[T, Mat] Source[T, Mat] = def expect100continueHandling[T, Mat]: Source[T, Mat] Source[T, Mat] =
if (expect100continue) { if (expect100continue) {
_.section(name("expect100continueTrigger"))(_.transform(() new PushPullStage[T, T] { _.via(Flow[T].transform(() new PushPullStage[T, T] {
private var oneHundredContinueSent = false private var oneHundredContinueSent = false
def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
def onPull(ctx: Context[T]) = { def onPull(ctx: Context[T]) = {
@ -139,7 +140,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
} }
ctx.pull() ctx.pull()
} }
})) }).named("expect100continueTrigger"))
} else identityFunc } else identityFunc
teh match { teh match {
@ -176,4 +177,4 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
expect100continue, hostHeaderPresent, closeAfterResponseCompletion) expect100continue, hostHeaderPresent, closeAfterResponseCompletion)
} }
} else failMessageStart("Request is missing required `Host` header") } else failMessageStart("Request is missing required `Host` header")
} }

View file

@ -114,12 +114,11 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`
case HttpEntity.Default(_, contentLength, data) case HttpEntity.Default(_, contentLength, data)
renderContentLength(contentLength) ~~ CrLf renderContentLength(contentLength) ~~ CrLf
renderByteStrings(r, renderByteStrings(r, data.via(CheckContentLengthTransformer.flow(contentLength)))
data.section(name("checkContentLength"))(_.transform(() new CheckContentLengthTransformer(contentLength))))
case HttpEntity.Chunked(_, chunks) case HttpEntity.Chunked(_, chunks)
r ~~ CrLf r ~~ CrLf
renderByteStrings(r, chunks.section(name("chunkTransform"))(_.transform(() new ChunkTransformer))) renderByteStrings(r, chunks.via(ChunkTransformer.flow))
} }
renderRequestLine() renderRequestLine()

View file

@ -172,7 +172,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
renderHeaders(headers.toList) renderHeaders(headers.toList)
renderEntityContentType(r, entity) renderEntityContentType(r, entity)
renderContentLengthHeader(contentLength) ~~ CrLf renderContentLengthHeader(contentLength) ~~ CrLf
byteStrings(data.section(name("checkContentLength"))(_.transform(() new CheckContentLengthTransformer(contentLength)))) byteStrings(data.via(CheckContentLengthTransformer.flow(contentLength)))
case HttpEntity.CloseDelimited(_, data) case HttpEntity.CloseDelimited(_, data)
renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD) renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD)
@ -185,7 +185,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
else { else {
renderHeaders(headers.toList) renderHeaders(headers.toList)
renderEntityContentType(r, entity) ~~ CrLf renderEntityContentType(r, entity) ~~ CrLf
byteStrings(chunks.section(name("renderChunks"))(_.transform(() new ChunkTransformer))) byteStrings(chunks.via(ChunkTransformer.flow))
} }
} }

View file

@ -12,6 +12,7 @@ import akka.stream.stage._
import akka.http.model._ import akka.http.model._
import akka.http.util._ import akka.http.util._
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
import akka.http.model.HttpEntity.ChunkStreamPart
/** /**
* INTERNAL API * INTERNAL API
@ -51,6 +52,10 @@ private object RenderSupport {
messageBytes messageBytes
} }
object ChunkTransformer {
val flow = Flow[ChunkStreamPart].transform(() new ChunkTransformer).named("renderChunks")
}
class ChunkTransformer extends StatefulStage[HttpEntity.ChunkStreamPart, ByteString] { class ChunkTransformer extends StatefulStage[HttpEntity.ChunkStreamPart, ByteString] {
var lastChunkSeen = false var lastChunkSeen = false
@ -67,6 +72,11 @@ private object RenderSupport {
else terminationEmit(Iterator.single(defaultLastChunkBytes), ctx) else terminationEmit(Iterator.single(defaultLastChunkBytes), ctx)
} }
object CheckContentLengthTransformer {
def flow(contentLength: Long) = Flow[ByteString].transform(()
new CheckContentLengthTransformer(contentLength)).named("checkContentLength")
}
class CheckContentLengthTransformer(length: Long) extends PushStage[ByteString, ByteString] { class CheckContentLengthTransformer(length: Long) extends PushStage[ByteString, ByteString] {
var sent = 0L var sent = 0L

View file

@ -48,10 +48,10 @@ private[http] object HttpServerBluePrint {
} }
} }
val requestParsingFlow = Flow[ByteString].section(name("rootParser"))(_.transform(() val requestParsingFlow = Flow[ByteString].transform(()
// each connection uses a single (private) request parser instance for all its requests // each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection // which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef).stage)) rootParser.createShallowCopy(() oneHundredContinueRef).stage).named("rootParser")
val requestPreparation = val requestPreparation =
Flow[RequestOutput] Flow[RequestOutput]
@ -79,10 +79,10 @@ private[http] object HttpServerBluePrint {
val rendererPipeline = val rendererPipeline =
Flow[ResponseRenderingContext] Flow[ResponseRenderingContext]
.section(name("recover"))(_.transform(() new ErrorsTo500ResponseRecovery(log))) // FIXME: simplify after #16394 is closed .via(Flow[ResponseRenderingContext].transform(() new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed
.section(name("renderer"))(_.transform(() responseRendererFactory.newRenderer)) .via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer"))
.flatten(FlattenStrategy.concat) .flatten(FlattenStrategy.concat)
.section(name("errorLogger"))(_.transform(() errorLogger(log, "Outgoing response stream error"))) .via(Flow[ByteString].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger"))
FlowGraph.partial(requestParsingFlow, rendererPipeline)(Keep.right) { implicit b FlowGraph.partial(requestParsingFlow, rendererPipeline)(Keep.right) { implicit b
(requestParsing, renderer) (requestParsing, renderer)

View file

@ -61,7 +61,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
} }
// TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393 // TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393
dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head) dataBytes.via(Flow[ByteString].timerTransform(transformer).named("toStrict")).runWith(Sink.head)
} }
/** /**

View file

@ -50,7 +50,7 @@ private[http] object StreamUtils {
ctx.fail(f(cause)) ctx.fail(f(cause))
} }
Flow[ByteString].section(name("transformError"))(_.transform(() transformer)) Flow[ByteString].transform(() transformer).named("transformError")
} }
def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString, Unit] = { def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString, Unit] = {
@ -84,7 +84,7 @@ private[http] object StreamUtils {
override def initial: State = if (start > 0) skipping else taking(length) override def initial: State = if (start > 0) skipping else taking(length)
} }
Flow[ByteString].section(name("sliceBytes"))(_.transform(() transformer)) Flow[ByteString].transform(() transformer).named("sliceBytes")
} }
def limitByteChunksStage(maxBytesPerChunk: Int): PushPullStage[ByteString, ByteString] = def limitByteChunksStage(maxBytesPerChunk: Int): PushPullStage[ByteString, ByteString] =

View file

@ -443,7 +443,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] = def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] =
Source(input.toList) Source(input.toList)
.map(ByteString.apply) .map(ByteString.apply)
.section(name("parser"))(_.transform(() parser.stage)) .transform(() parser.stage).named("parser")
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail .headAndTail
.collect { .collect {

View file

@ -261,7 +261,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val future = val future =
Source(input.toList) Source(input.toList)
.map(ByteString.apply) .map(ByteString.apply)
.section(name("parser"))(_.transform(() newParserStage(requestMethod))) .transform(() newParserStage(requestMethod)).named("parser")
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail .headAndTail
.collect { .collect {

View file

@ -252,9 +252,9 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
def renderTo(expected: String): Matcher[HttpRequest] = def renderTo(expected: String): Matcher[HttpRequest] =
equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request
val renderer = newRenderer val renderer = newRenderer
val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, serverAddress)). val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, serverAddress))
section(name("renderer"))(_.transform(() renderer)). .transform(() renderer).named("renderer")
runWith(Sink.head), 1.second) .runWith(Sink.head), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis) Await.result(future, 250.millis)
} }

View file

@ -548,9 +548,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
def renderTo(expected: String, close: Boolean): Matcher[ResponseRenderingContext] = def renderTo(expected: String, close: Boolean): Matcher[ResponseRenderingContext] =
equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx
val renderer = newRenderer val renderer = newRenderer
val byteStringSource = Await.result(Source.single(ctx). val byteStringSource = Await.result(Source.single(ctx)
section(name("renderer"))(_.transform(() renderer)). .transform(() renderer).named("renderer")
runWith(Sink.head), 1.second) .runWith(Sink.head), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis) -> renderer.isComplete Await.result(future, 250.millis) -> renderer.isComplete
} }

View file

@ -15,21 +15,17 @@ import akka.stream.stage.*;
import akka.stream.javadsl.japi.*; import akka.stream.javadsl.japi.*;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe; import akka.testkit.TestProbe;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit; import scala.runtime.BoxedUnit;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class FlowGraphTest extends StreamTest { public class FlowGraphTest extends StreamTest {
@ -41,6 +37,7 @@ public class FlowGraphTest extends StreamTest {
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowGraphTest", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowGraphTest",
AkkaSpec.testConf()); AkkaSpec.testConf());
@SuppressWarnings("serial")
public <T> Creator<Stage<T, T>> op() { public <T> Creator<Stage<T, T>> op() {
return new akka.stream.javadsl.japi.Creator<Stage<T, T>>() { return new akka.stream.javadsl.japi.Creator<Stage<T, T>>() {
@Override @Override
@ -62,39 +59,13 @@ public class FlowGraphTest extends StreamTest {
@Test @Test
public void mustBeAbleToUseMerge() throws Exception { public void mustBeAbleToUseMerge() throws Exception {
final Flow<String, String, BoxedUnit> f1 = Flow final Flow<String, String, BoxedUnit> f1 =
.of(String.class) Flow.of(String.class).transform(FlowGraphTest.this.<String> op()).named("f1");
.section( final Flow<String, String, BoxedUnit> f2 =
OperationAttributes.name("f1"), Flow.of(String.class).transform(FlowGraphTest.this.<String> op()).named("f2");
new Function<Flow<String, String, Object>, Flow<String, String, Object>>() { @SuppressWarnings("unused")
@Override final Flow<String, String, BoxedUnit> f3 =
public Flow<String, String, Object> apply( Flow.of(String.class).transform(FlowGraphTest.this.<String> op()).named("f3");
Flow<String, String, Object> flow) {
return flow.transform(FlowGraphTest.this.<String> op());
}
});
final Flow<String, String, BoxedUnit> f2 = Flow
.of(String.class)
.section(
OperationAttributes.name("f2"),
new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override
public Flow<String, String, Object> apply(
Flow<String, String, Object> flow) {
return flow.transform(FlowGraphTest.this.<String> op());
}
});
final Flow<String, String, BoxedUnit> f3 = Flow
.of(String.class)
.section(
OperationAttributes.name("f3"),
new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override
public Flow<String, String, Object> apply(
Flow<String, String, Object> flow) {
return flow.transform(FlowGraphTest.this.<String> op());
}
});
final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c")); final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, BoxedUnit> in2 = Source.from(Arrays.asList("d", "e", "f")); final Source<String, BoxedUnit> in2 = Source.from(Arrays.asList("d", "e", "f"));

View file

@ -18,23 +18,19 @@ import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import scala.runtime.Boxed; import scala.runtime.Boxed;
import scala.runtime.BoxedUnit; import scala.runtime.BoxedUnit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@SuppressWarnings("serial")
public class FlowTest extends StreamTest { public class FlowTest extends StreamTest {
public FlowTest() { public FlowTest() {
super(actorSystemResource); super(actorSystemResource);
@ -242,39 +238,13 @@ public class FlowTest extends StreamTest {
@Test @Test
public void mustBeAbleToUseMerge() throws Exception { public void mustBeAbleToUseMerge() throws Exception {
final Flow<String, String, BoxedUnit> f1 = Flow final Flow<String, String, BoxedUnit> f1 =
.of(String.class) Flow.of(String.class).transform(FlowTest.this.<String> op()).named("f1");
.section( final Flow<String, String, BoxedUnit> f2 =
OperationAttributes.name("f1"), Flow.of(String.class).transform(FlowTest.this.<String> op()).named("f2");
new Function<Flow<String, String, Object>, Flow<String, String, Object>>() { @SuppressWarnings("unused")
@Override final Flow<String, String, BoxedUnit> f3 =
public Flow<String, String, Object> apply( Flow.of(String.class).transform(FlowTest.this.<String> op()).named("f3");
Flow<String, String, Object> flow) {
return flow.transform(FlowTest.this.<String> op());
}
});
final Flow<String, String, BoxedUnit> f2 = Flow
.of(String.class)
.section(
OperationAttributes.name("f2"),
new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override
public Flow<String, String, Object> apply(
Flow<String, String, Object> flow) {
return flow.transform(FlowTest.this.<String> op());
}
});
final Flow<String, String, BoxedUnit> f3 = Flow
.of(String.class)
.section(
OperationAttributes.name("f3"),
new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override
public Flow<String, String, Object> apply(
Flow<String, String, Object> flow) {
return flow.transform(FlowTest.this.<String> op());
}
});
final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c")); final Source<String, BoxedUnit> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, BoxedUnit> in2 = Source.from(Arrays.asList("d", "e", "f")); final Source<String, BoxedUnit> in2 = Source.from(Arrays.asList("d", "e", "f"));

View file

@ -14,7 +14,6 @@ import org.reactivestreams.Publisher;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.javadsl.japi.Function2; import akka.stream.javadsl.japi.Function2;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;

View file

@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@SuppressWarnings("serial")
public class SourceTest extends StreamTest { public class SourceTest extends StreamTest {
public SourceTest() { public SourceTest() {
super(actorSystemResource); super(actorSystemResource);

View file

@ -29,12 +29,12 @@ class FlowGraphCompileSpec extends AkkaSpec {
val apples = () Iterator.continually(new Apple) val apples = () Iterator.continually(new Apple)
val f1 = Flow[String].section(name("f1"))(_.transform(op[String, String])) val f1 = Flow[String].transform(op[String, String]).named("f1")
val f2 = Flow[String].section(name("f2"))(_.transform(op[String, String])) val f2 = Flow[String].transform(op[String, String]).named("f2")
val f3 = Flow[String].section(name("f3"))(_.transform(op[String, String])) val f3 = Flow[String].transform(op[String, String]).named("f3")
val f4 = Flow[String].section(name("f4"))(_.transform(op[String, String])) val f4 = Flow[String].transform(op[String, String]).named("f4")
val f5 = Flow[String].section(name("f5"))(_.transform(op[String, String])) val f5 = Flow[String].transform(op[String, String]).named("f5")
val f6 = Flow[String].section(name("f6"))(_.transform(op[String, String])) val f6 = Flow[String].transform(op[String, String]).named("f6")
val in1 = Source(List("a", "b", "c")) val in1 = Source(List("a", "b", "c"))
val in2 = Source(List("d", "e", "f")) val in2 = Source(List("d", "e", "f"))
@ -94,7 +94,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
}.run() }.run()
} }
/** /*
* in ---> f1 -+-> f2 -+-> f3 ---> b.add(out1) * in ---> f1 -+-> f2 -+-> f3 ---> b.add(out1)
* ^ | * ^ |
* | V * | V
@ -161,7 +161,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
val out2 = Sink.publisher[String] val out2 = Sink.publisher[String]
val out9 = Sink.publisher[String] val out9 = Sink.publisher[String]
val out10 = Sink.publisher[String] val out10 = Sink.publisher[String]
def f(s: String) = Flow[String].section(name(s))(_.transform(op[String, String])) def f(s: String) = Flow[String].transform(op[String, String]).named(s)
import FlowGraph.Implicits._ import FlowGraph.Implicits._
in7 ~> f("a") ~> b7 ~> f("b") ~> m11 ~> f("c") ~> b11 ~> f("d") ~> out2 in7 ~> f("a") ~> b7 ~> f("b") ~> m11 ~> f("c") ~> b11 ~> f("d") ~> out2

View file

@ -221,8 +221,9 @@ class FlowGroupBySpec extends AkkaSpec {
"resume stream when groupBy function throws" in { "resume stream when groupBy function throws" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val exc = TE("test") val exc = TE("test")
val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))( val publisher = Source(publisherProbeProbe)
_.groupBy(elem if (elem == 2) throw exc else elem % 2)) .groupBy(elem if (elem == 2) throw exc else elem % 2)
.withAttributes(OperationAttributes.supervisionStrategy(resumingDecider))
.runWith(Sink.publisher) .runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]() val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]()
publisher.subscribe(subscriber) publisher.subscribe(subscriber)

View file

@ -158,10 +158,13 @@ class FlowMapAsyncSpec extends AkkaSpec {
"resume after future failure" in { "resume after future failure" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(4, n Future { val p = Source(1 to 5)
if (n == 3) throw new RuntimeException("err3") with NoStackTrace .mapAsync(4, n Future {
else n if (n == 3) throw new RuntimeException("err3") with NoStackTrace
})).to(Sink(c)).run() else n
})
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
for (n List(1, 2, 4, 5)) c.expectNext(n) for (n List(1, 2, 4, 5)) c.expectNext(n)
@ -171,10 +174,12 @@ class FlowMapAsyncSpec extends AkkaSpec {
"resume when mapAsync throws" in { "resume when mapAsync throws" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(4, n val p = Source(1 to 5)
if (n == 3) throw new RuntimeException("err4") with NoStackTrace .mapAsync(4, n
else Future(n))). if (n == 3) throw new RuntimeException("err4") with NoStackTrace
to(Sink(c)).run() else Future(n))
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
for (n List(1, 2, 4, 5)) c.expectNext(n) for (n List(1, 2, 4, 5)) c.expectNext(n)
@ -191,8 +196,9 @@ class FlowMapAsyncSpec extends AkkaSpec {
"resume when future is completed with null" in { "resume when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))( val p = Source(List("a", "b", "c"))
_.mapAsync(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem))) .mapAsync(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem))
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run() .to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)

View file

@ -112,10 +112,13 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume after future failure" in { "resume after future failure" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(4, n Future { val p = Source(1 to 5)
if (n == 3) throw new RuntimeException("err3") with NoStackTrace .mapAsyncUnordered(4, n Future {
else n if (n == 3) throw new RuntimeException("err3") with NoStackTrace
})).to(Sink(c)).run() else n
})
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet
@ -125,10 +128,12 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume when mapAsyncUnordered throws" in { "resume when mapAsyncUnordered throws" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(4, n val p = Source(1 to 5)
if (n == 3) throw new RuntimeException("err4") with NoStackTrace .mapAsyncUnordered(4, n
else Future(n))). if (n == 3) throw new RuntimeException("err4") with NoStackTrace
to(Sink(c)).run() else Future(n))
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet
@ -145,8 +150,9 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume when future is completed with null" in { "resume when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))( val p = Source(List("a", "b", "c"))
_.mapAsyncUnordered(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem))) .mapAsyncUnordered(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem))
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run() .to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)

View file

@ -11,9 +11,9 @@ import akka.testkit.TestProbe
object FlowSectionSpec { object FlowSectionSpec {
val config = val config =
""" s"""
my-dispatcher1 = ${akka.test.stream-dispatcher} my-dispatcher1 = $${akka.test.stream-dispatcher}
my-dispatcher2 = ${akka.test.stream-dispatcher} my-dispatcher2 = $${akka.test.stream-dispatcher}
""" """
} }
@ -24,7 +24,7 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
"A flow" can { "A flow" can {
"have an op with a different dispatcher" in { "have an op with a different dispatcher" in {
val flow = Flow[Int].section(dispatcher("my-dispatcher1"))(_.map(sendThreadNameTo(testActor))) val flow = Flow[Int].map(sendThreadNameTo(testActor)).withAttributes(dispatcher("my-dispatcher1"))
Source.single(1).via(flow).to(Sink.ignore).run() Source.single(1).via(flow).to(Sink.ignore).run()
@ -32,7 +32,13 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
} }
"have a nested flow with a different dispatcher" in { "have a nested flow with a different dispatcher" in {
val flow = Flow[Int].section(dispatcher("my-dispatcher1"))(_.via(Flow[Int].map(sendThreadNameTo(testActor)))) val flow = Flow() { implicit b
import FlowGraph.Implicits._
val bcast1 = b.add(Broadcast[Int](1))
val bcast2 = b.add(Broadcast[Int](1))
bcast1 ~> Flow[Int].map(sendThreadNameTo(testActor)) ~> bcast2.in
(bcast1.in, bcast2.out(0))
}.withAttributes(dispatcher("my-dispatcher1"))
Source.single(1).via(flow).to(Sink.ignore).run() Source.single(1).via(flow).to(Sink.ignore).run()
@ -40,50 +46,58 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
} }
"have multiple levels of nesting" in { "have multiple levels of nesting" in {
val flow = Flow[Int].section(dispatcher("my-dispatcher1"))(
_.via(Flow[Int].map(sendThreadNameTo(testActor)).section(dispatcher("my-dispatcher2"))(
_.via(Flow[Int].map(sendThreadNameTo(testActor))))))
Source.single(1).via(flow).to(Sink.ignore).run() val probe1 = TestProbe()
val probe2 = TestProbe()
expectMsgType[String] should include("my-dispatcher1") val flow1 = Flow() { implicit b
expectMsgType[String] should include("my-dispatcher2") import FlowGraph.Implicits._
val bcast1 = b.add(Broadcast[Int](1))
val bcast2 = b.add(Broadcast[Int](1))
bcast1 ~> Flow[Int].map(sendThreadNameTo(probe1.ref)) ~> bcast2.in
(bcast1.in, bcast2.out(0))
}.withAttributes(dispatcher("my-dispatcher1"))
val flow2 = Flow() { implicit b
import FlowGraph.Implicits._
val bcast1 = b.add(Broadcast[Int](1))
val bcast2 = b.add(Broadcast[Int](1))
bcast1 ~> flow1.via(Flow[Int].map(sendThreadNameTo(probe2.ref))) ~> bcast2.in
(bcast1.in, bcast2.out(0))
}.withAttributes(dispatcher("my-dispatcher2"))
Source.single(1).via(flow2).to(Sink.ignore).run()
probe1.expectMsgType[String] should include("my-dispatcher1")
probe2.expectMsgType[String] should include("my-dispatcher2")
} }
"have an op section with a name" in { "include name in toString" in {
//FIXME: Flow has no simple toString anymore //FIXME: Flow has no simple toString anymore
pending pending
val n = "Uppercase reverser" val n = "Uppercase reverser"
val f = Flow[String]. val f1 = Flow[String].map(_.toLowerCase())
map(_.toLowerCase()). val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase())
section(name(n)) {
_.map(_.toUpperCase). f1.via(f2).toString should include(n)
map(_.reverse)
}.
map(_.toLowerCase())
f.toString should include(n)
} }
"have an op section with a different dispatcher and name" in { "have an op section with a different dispatcher and name" in {
val defaultDispatcher = TestProbe() val defaultDispatcher = TestProbe()
val customDispatcher = TestProbe() val customDispatcher = TestProbe()
val f = Flow[Int]. val f1 = Flow[Int].map(sendThreadNameTo(defaultDispatcher.ref))
map(sendThreadNameTo(defaultDispatcher.ref)). val f2 = Flow[Int].map(sendThreadNameTo(customDispatcher.ref))
section(dispatcher("my-dispatcher1") and name("separate-disptacher")) { .withAttributes(dispatcher("my-dispatcher1") and name("separate-disptacher"))
_.map(sendThreadNameTo(customDispatcher.ref)).
map(sendThreadNameTo(customDispatcher.ref))
}.
map(sendThreadNameTo(defaultDispatcher.ref))
Source(0 to 2).via(f).runWith(Sink.ignore) Source(0 to 2).via(f1).via(f2).runWith(Sink.ignore)
defaultDispatcher.receiveN(6).foreach { defaultDispatcher.receiveN(3).foreach {
case s: String s should include("akka.test.stream-dispatcher") case s: String s should include("akka.test.stream-dispatcher")
} }
customDispatcher.receiveN(6).foreach { customDispatcher.receiveN(3).foreach {
case s: String s should include("my-dispatcher1") case s: String s should include("my-dispatcher1")
} }
} }

View file

@ -143,8 +143,9 @@ class FlowSplitWhenSpec extends AkkaSpec {
"resume stream when splitWhen function throws" in { "resume stream when splitWhen function throws" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val exc = TE("test") val exc = TE("test")
val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))( val publisher = Source(publisherProbeProbe)
_.splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)) .splitWhen(elem if (elem == 3) throw exc else elem % 3 == 0)
.withAttributes(OperationAttributes.supervisionStrategy(resumingDecider))
.runWith(Sink.publisher) .runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]() val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]()
publisher.subscribe(subscriber) publisher.subscribe(subscriber)

View file

@ -222,7 +222,7 @@ final case class ActorFlowMaterializerSettings(
/** /**
* Scala API: Decides how exceptions from application code are to be handled, unless * Scala API: Decides how exceptions from application code are to be handled, unless
* overridden for specific sections of the stream operations with * overridden for specific flows of the stream operations with
* [[akka.stream.scaladsl.OperationAttributes#supervisionStrategy]]. * [[akka.stream.scaladsl.OperationAttributes#supervisionStrategy]].
*/ */
def withSupervisionStrategy(decider: Supervision.Decider): ActorFlowMaterializerSettings = def withSupervisionStrategy(decider: Supervision.Decider): ActorFlowMaterializerSettings =
@ -230,7 +230,7 @@ final case class ActorFlowMaterializerSettings(
/** /**
* Java API: Decides how exceptions from application code are to be handled, unless * Java API: Decides how exceptions from application code are to be handled, unless
* overridden for specific sections of the stream operations with * overridden for specific flows of the stream operations with
* [[akka.stream.javadsl.OperationAttributes#supervisionStrategy]]. * [[akka.stream.javadsl.OperationAttributes#supervisionStrategy]].
*/ */
def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = {

View file

@ -26,11 +26,10 @@ private[akka] trait TimedOps {
def timed[I, O, Mat, Mat2](source: Source[I, Mat], measuredOps: Source[I, Mat] Source[O, Mat2], onComplete: FiniteDuration Unit): Source[O, Mat2] = { def timed[I, O, Mat, Mat2](source: Source[I, Mat], measuredOps: Source[I, Mat] Source[O, Mat2], onComplete: FiniteDuration Unit): Source[O, Mat2] = {
val ctx = new TimedFlowContext val ctx = new TimedFlowContext
val startTimed = (f: Flow[I, I, Any]) f.transform(() new StartTimedFlow(ctx)) val startTimed = Flow[I].transform(() new StartTimed(ctx)).named("startTimed")
val stopTimed = (f: Flow[O, O, Any]) f.transform(() new StopTimed(ctx, onComplete)) val stopTimed = Flow[O].transform(() new StopTimed(ctx, onComplete)).named("stopTimed")
val begin = source.section(name("startTimed"))(startTimed) measuredOps(source.via(startTimed)).via(stopTimed)
measuredOps(begin).section(name("stopTimed"))(stopTimed)
} }
/** /**
@ -43,11 +42,10 @@ private[akka] trait TimedOps {
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type
val ctx = new TimedFlowContext val ctx = new TimedFlowContext
val startTimed = (f: Flow[O, O, Any]) f.transform(() new StartTimedFlow(ctx)) val startTimed = Flow[O].transform(() new StartTimed(ctx)).named("startTimed")
val stopTimed = (f: Flow[Out, Out, Any]) f.transform(() new StopTimed(ctx, onComplete)) val stopTimed = Flow[Out].transform(() new StopTimed(ctx, onComplete)).named("stopTimed")
val begin: Flow[I, O, Mat] = flow.section(name("startTimed"))(startTimed) measuredOps(flow.via(startTimed)).via(stopTimed)
measuredOps(begin).section(name("stopTimed"))(stopTimed)
} }
} }
@ -65,20 +63,16 @@ private[akka] trait TimedIntervalBetweenOps {
* Measures rolling interval between immediately subsequent `matching(o: O)` elements. * Measures rolling interval between immediately subsequent `matching(o: O)` elements.
*/ */
def timedIntervalBetween[O, Mat](source: Source[O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Source[O, Mat] = { def timedIntervalBetween[O, Mat](source: Source[O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Source[O, Mat] = {
source.section(name("timedInterval")) { val timedInterval = Flow[O].transform(() new TimedInterval[O](matching, onInterval)).named("timedInterval")
_.transform(() new TimedIntervalTransformer[O](matching, onInterval)) source.via(timedInterval)
}
} }
/** /**
* Measures rolling interval between immediately subsequent `matching(o: O)` elements. * Measures rolling interval between immediately subsequent `matching(o: O)` elements.
*/ */
def timedIntervalBetween[I, O, Mat](flow: Flow[I, O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[I, O, Mat] = { def timedIntervalBetween[I, O, Mat](flow: Flow[I, O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[I, O, Mat] = {
// todo is there any other way to provide this for Flow / Duct, without duplicating impl? val timedInterval = Flow[O].transform(() new TimedInterval[O](matching, onInterval)).named("timedInterval")
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type flow.via(timedInterval)
flow.section(name("timedInterval")) {
_.transform(() new TimedIntervalTransformer[O](matching, onInterval))
}
} }
} }
@ -108,7 +102,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
} }
} }
final class StartTimedFlow[T](timedContext: TimedFlowContext) extends PushStage[T, T] { final class StartTimed[T](timedContext: TimedFlowContext) extends PushStage[T, T] {
private var started = false private var started = false
override def onPush(elem: T, ctx: Context[T]): SyncDirective = { override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
@ -139,7 +133,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
} }
final class TimedIntervalTransformer[T](matching: T Boolean, onInterval: FiniteDuration Unit) extends PushStage[T, T] { final class TimedInterval[T](matching: T Boolean, onInterval: FiniteDuration Unit) extends PushStage[T, T] {
private var prevNanos = 0L private var prevNanos = 0L
private var matched = 0L private var matched = 0L

View file

@ -408,16 +408,6 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
def concat[M](second: javadsl.Source[Out @uncheckedVariance, M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = def concat[M](second: javadsl.Source[Out @uncheckedVariance, M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] =
new Flow(delegate.concat(second.asScala).mapMaterialized(p Pair(p._1, p._2))) new Flow(delegate.concat(second.asScala).mapMaterialized(p Pair(p._1, p._2)))
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Any], javadsl.Flow[Out, O, Any]] @uncheckedVariance): javadsl.Flow[In, O, Mat] =
new Flow(delegate.section(attributes.asScala) {
val scalaToJava = (flow: scaladsl.Flow[Out, Out, Any]) new javadsl.Flow(flow)
val javaToScala = (flow: javadsl.Flow[Out, O, Any]) flow.asScala
scalaToJava andThen section.apply andThen javaToScala
})
def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr.asScala)) new Flow(delegate.withAttributes(attr.asScala))

View file

@ -498,16 +498,6 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] =
new Source(delegate.flatten(strategy)) new Source(delegate.flatten(strategy))
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Any], javadsl.Flow[Out, O, Any]] @uncheckedVariance): javadsl.Source[O, Mat] =
new Source(delegate.section(attributes.asScala) {
val scalaToJava = (source: scaladsl.Flow[Out, Out, Any]) new javadsl.Flow(source)
val javaToScala = (source: javadsl.Flow[Out, O, Any]) source.asScala
scalaToJava andThen section.apply andThen javaToScala
})
def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] = def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr.asScala)) new Source(delegate.withAttributes(attr.asScala))

View file

@ -277,25 +277,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
source.via(this).toMat(sink)(Keep.both).run() source.via(this).toMat(sink)(Keep.both).run()
} }
// FIXME remove (in favor of .via)
def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) Mat3)(
section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[In, O, Mat3] = {
val subFlow = section(Flow[O2]).module.carbonCopy.withAttributes(attributes).wrap()
if (this.isIdentity) new Flow(subFlow).asInstanceOf[Flow[In, O, Mat3]]
else new Flow(
module
.growConnect(subFlow, shape.outlet, subFlow.shape.inlets.head, combine)
.replaceShape(FlowShape(shape.inlet, subFlow.shape.outlets.head)))
}
/**
* Applies given [[OperationAttributes]] to a given section.
*/
// FIXME remove (in favor of .via)
def section[O, O2 >: Out](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Any]): Flow[In, O, Mat] = {
this.section[O, O2, Any, Mat](attributes, Keep.left)(section)
}
/** Converts this Scala DSL element to it's Java DSL counterpart. */ /** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)

View file

@ -144,20 +144,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
*/ */
def ++[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concat(second) def ++[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concat(second)
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) Mat3)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Source[O, Mat3] = {
val subFlow = section(Flow[O2]).module.carbonCopy.withAttributes(attributes).wrap()
new Source(
module
.growConnect(subFlow, shape.outlet, subFlow.shape.inlets.head, combine)
.replaceShape(SourceShape(subFlow.shape.outlets.head)))
}
def section[O, O2 >: Out](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Any]): Source[O, Mat] =
this.section[O, O2, Any, Mat](attributes, Keep.left)(section)
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] =
new Source(module.withAttributes(attr).wrap()) new Source(module.withAttributes(attr).wrap())