=str #15152 operation transformations

This commit is contained in:
Martynas Mickevicius 2014-12-01 20:07:55 +02:00
parent 92432cc8b3
commit 64feb041be
46 changed files with 877 additions and 373 deletions

View file

@ -10,6 +10,7 @@ import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
import akka.stream.scaladsl.OperationAttributes._
import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse }
import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser }
@ -40,14 +41,14 @@ private[http] object HttpClient {
Flow[HttpRequest]
.map(requestMethodByPass)
.transform("renderer", () requestRendererFactory.newRenderer)
.section(name("renderer"))(_.transform(() requestRendererFactory.newRenderer))
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing request stream error"))
.section(name("errorLogger"))(_.transform(() errorLogger(log, "Outgoing request stream error")))
.via(transport)
.transform("rootParser", ()
.section(name("rootParser"))(_.transform(()
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(requestMethodByPass))
rootParser.createShallowCopy(requestMethodByPass)))
.splitWhen(_.isInstanceOf[MessageStart])
.headAndTail
.collect {

View file

@ -7,6 +7,7 @@ package akka.http.engine.parsing
import java.lang.{ StringBuilder JStringBuilder }
import scala.annotation.tailrec
import akka.actor.ActorRef
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.stage.{ Context, PushPullStage }
import akka.stream.scaladsl.Source
import akka.util.ByteString
@ -126,7 +127,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
def expect100continueHandling[T]: Source[T] Source[T] =
if (expect100continue) {
_.transform("expect100continueTrigger", () new PushPullStage[T, T] {
_.section(name("expect100continueTrigger"))(_.transform(() new PushPullStage[T, T] {
private var oneHundredContinueSent = false
def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
def onPull(ctx: Context[T]) = {
@ -137,7 +138,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
}
ctx.pull()
}
})
}))
} else identityFunc
teh match {

View file

@ -8,6 +8,7 @@ import java.net.InetSocketAddress
import scala.annotation.tailrec
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.http.model._
@ -114,11 +115,11 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`
case HttpEntity.Default(_, contentLength, data)
renderContentLength(contentLength) ~~ CrLf
renderByteStrings(r,
data.transform("checkContentLength", () new CheckContentLengthTransformer(contentLength)))
data.section(name("checkContentLength"))(_.transform(() new CheckContentLengthTransformer(contentLength))))
case HttpEntity.Chunked(_, chunks)
r ~~ CrLf
renderByteStrings(r, chunks.transform("chunkTransform", () new ChunkTransformer))
renderByteStrings(r, chunks.section(name("chunkTransform"))(_.transform(() new ChunkTransformer)))
}
renderRequestLine()

View file

@ -7,6 +7,7 @@ package akka.http.engine.rendering
import scala.annotation.tailrec
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.http.model._
@ -155,7 +156,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
renderHeaders(headers.toList)
renderEntityContentType(r, entity)
renderContentLengthHeader(contentLength) ~~ CrLf
byteStrings(data.transform("checkContentLength", () new CheckContentLengthTransformer(contentLength)))
byteStrings(data.section(name("checkContentLength"))(_.transform(() new CheckContentLengthTransformer(contentLength))))
case HttpEntity.CloseDelimited(_, data)
renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD)
@ -168,7 +169,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
else {
renderHeaders(headers.toList)
renderEntityContentType(r, entity) ~~ CrLf
byteStrings(chunks.transform("renderChunks", () new ChunkTransformer))
byteStrings(chunks.section(name("renderChunks"))(_.transform(() new ChunkTransformer)))
}
}

View file

@ -8,8 +8,10 @@ import akka.actor.{ ActorRef, Props }
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.stage.PushPullStage
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
import akka.stream.stage.PushPullStage
import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser }
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.engine.parsing.ParserOutput._
@ -51,10 +53,10 @@ private[http] object HttpServer {
val bypassFanout = Broadcast[RequestOutput]("bypassFanout")
val bypassMerge = new BypassMerge(settings, log)
val requestParsing = Flow[ByteString].transform("rootParser", ()
val requestParsing = Flow[ByteString].section(name("rootParser"))(_.transform(()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef))
rootParser.createShallowCopy(() oneHundredContinueRef)))
val requestPreparation =
Flow[RequestOutput]
@ -69,10 +71,10 @@ private[http] object HttpServer {
val rendererPipeline =
Flow[ResponseRenderingContext]
.transform("recover", () new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed
.transform("renderer", () responseRendererFactory.newRenderer)
.section(name("recover"))(_.transform(() new ErrorsTo500ResponseRecovery(log))) // FIXME: simplify after #16394 is closed
.section(name("renderer"))(_.transform(() responseRendererFactory.newRenderer))
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
.section(name("errorLogger"))(_.transform(() errorLogger(log, "Outgoing response stream error")))
val transportIn = UndefinedSource[ByteString]
val transportOut = UndefinedSink[ByteString]

View file

@ -12,6 +12,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.util.ByteString
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlowMaterializer
import akka.stream.scaladsl._
import akka.stream.TimerTransformer
@ -63,7 +64,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
}
// TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393
dataBytes.timerTransform("toStrict", transformer).runWith(Sink.head)
dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head)
}
/**

View file

@ -19,6 +19,7 @@ import akka.stream.impl.Ast.AstNode
import akka.stream.impl.Ast.StageFactory
import akka.stream.impl.fusing.IteratorInterpreter
import akka.stream.scaladsl._
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.stage._
import akka.stream.impl
import akka.util.ByteString
@ -44,7 +45,7 @@ private[http] object StreamUtils {
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = ctx.absorbTermination()
}
Flow[ByteString].transform("transformBytes", () transformer)
Flow[ByteString].section(name("transformBytes"))(_.transform(() transformer))
}
def failedPublisher[T](ex: Throwable): Publisher[T] =
@ -59,7 +60,7 @@ private[http] object StreamUtils {
ctx.fail(f(cause))
}
Flow[ByteString].transform("transformError", () transformer)
Flow[ByteString].section(name("transformError"))(_.transform(() transformer))
}
def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString] = {
@ -90,7 +91,7 @@ private[http] object StreamUtils {
override def initial: State = if (start > 0) skipping else taking(length)
}
Flow[ByteString].transform("sliceBytes", () transformer)
Flow[ByteString].section(name("sliceBytes"))(_.transform(() transformer))
}
/**
@ -182,7 +183,7 @@ private[http] object StreamUtils {
Try {
transformer match {
// FIXME #16382 right now the flow can't use keys, should that be allowed?
case Pipe(ops, keys) if keys.isEmpty
case Pipe(ops, keys, _) if keys.isEmpty
if (ops.isEmpty)
Some(sourceData)
else {

View file

@ -57,21 +57,20 @@ package object util {
private[http] implicit class EnhancedSource[T](val underlying: Source[T]) {
def printEvent(marker: String): Source[T] =
underlying.transform("transform",
() new PushStage[T, T] {
override def onPush(element: T, ctx: Context[T]): Directive = {
println(s"$marker: $element")
ctx.push(element)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
println(s"$marker: Failure $cause")
super.onUpstreamFailure(cause, ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
println(s"$marker: Terminated")
super.onUpstreamFinish(ctx)
}
})
underlying.transform(() new PushStage[T, T] {
override def onPush(element: T, ctx: Context[T]): Directive = {
println(s"$marker: $element")
ctx.push(element)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
println(s"$marker: Failure $cause")
super.onUpstreamFailure(cause, ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
println(s"$marker: Terminated")
super.onUpstreamFinish(ctx)
}
})
/**
* Drain this stream into a Vector and provide it as a future value.

View file

@ -10,6 +10,7 @@ import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
import org.scalatest.matchers.Matcher
import akka.stream.scaladsl._
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlattenStrategy
import akka.stream.FlowMaterializer
import akka.util.ByteString
@ -441,7 +442,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] =
Source(input.toList)
.map(ByteString.apply)
.transform("parser", () parser)
.section(name("parser"))(_.transform(() parser))
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail
.collect {

View file

@ -10,6 +10,7 @@ import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers }
import org.scalatest.matchers.Matcher
import akka.stream.scaladsl._
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlattenStrategy
import akka.stream.FlowMaterializer
import akka.util.ByteString
@ -260,7 +261,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val future =
Source(input.toList)
.map(ByteString.apply)
.transform("parser", () newParser(requestMethod))
.section(name("parser"))(_.transform(() newParser(requestMethod)))
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail
.collect {

View file

@ -16,6 +16,7 @@ import akka.http.model._
import akka.http.model.headers._
import akka.http.util._
import akka.stream.scaladsl._
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlowMaterializer
import akka.stream.impl.SynchronousIterablePublisher
import HttpEntity._
@ -253,7 +254,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request
val renderer = newRenderer
val byteStringSource = Await.result(Source.singleton(RequestRenderingContext(request, serverAddress)).
transform("renderer", () renderer).
section(name("renderer"))(_.transform(() renderer)).
runWith(Sink.head), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis)

View file

@ -16,6 +16,7 @@ import akka.http.model.headers._
import akka.http.util._
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlowMaterializer
import HttpEntity._
@ -400,7 +401,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx
val renderer = newRenderer
val byteStringSource = Await.result(Source.singleton(ctx).
transform("renderer", () renderer).
section(name("renderer"))(_.transform(() renderer)).
runWith(Sink.head), 1.second)
val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis) -> renderer.isComplete

View file

@ -37,7 +37,7 @@ trait MultipartMarshallers {
HttpEntity(contentType, data)
case _
val chunks = value.parts
.transform("bodyPartRenderer", () BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.transform(() BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.flatten(FlattenStrategy.concat)
HttpEntity.Chunked(contentType, chunks)
}

View file

@ -85,7 +85,7 @@ trait MultipartUnmarshallers {
createStrict(mediaType, builder.result())
case _
val bodyParts = entity.dataBytes
.transform("bodyPart", () parser)
.transform(() parser)
.splitWhen(_.isInstanceOf[BodyPartStart])
.headAndTail
.collect {

View file

@ -6,6 +6,7 @@ package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.impl.{ Ast, ActorBasedFlowMaterializer }
import akka.stream.scaladsl.MaterializedMap
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import org.reactivestreams.{ Publisher, Processor }
import akka.stream.impl.fusing.Map
@ -25,7 +26,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet()
val (processor, _ns) = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode(
Ast.Fused(List(Map[Int, Int](identity)), "identity"), flowName, 1)
Ast.Fused(List(Map[Int, Int](identity)), name("identity")), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}

View file

@ -3,6 +3,7 @@
*/
package akka.stream.tck
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.MaterializerSettings
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
@ -34,7 +35,7 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] {
}
val (processor, _) = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode(
Ast.StageFactory(mkStage, "transform"), flowName, 1)
Ast.StageFactory(mkStage, name("transform")), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}

View file

@ -102,7 +102,7 @@ public class FlowTest extends StreamTest {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
// duplicate each element, stop after 4 elements, and emit sum to the end
Source.from(input).transform("publish", new Creator<Stage<Integer, Integer>>() {
Source.from(input).transform(new Creator<Stage<Integer, Integer>>() {
@Override
public PushPullStage<Integer, Integer> create() throws Exception {
return new StatefulStage<Integer, Integer>() {
@ -248,9 +248,24 @@ public class FlowTest extends StreamTest {
@Test
public void mustBeAbleToUseMerge() throws Exception {
final Flow<String, String> f1 = Flow.of(String.class).transform("f1", this.<String>op()); // javadsl
final Flow<String, String> f2 = Flow.of(String.class).transform("f2", this.<String>op()); // javadsl
final Flow<String, String> f3 = Flow.of(String.class).transform("f2", this.<String>op()); // javadsl
final Flow<String, String> f1 = Flow.of(String.class).section(OperationAttributes.name("f1"), new Function<Flow<String, String>, Flow<String, String>>() {
@Override
public Flow<String, String> apply(Flow<String, String> flow) {
return flow.transform(FlowTest.this.<String>op());
}
});
final Flow<String, String> f2 = Flow.of(String.class).section(OperationAttributes.name("f2"), new Function<Flow<String, String>, Flow<String, String>>() {
@Override
public Flow<String, String> apply(Flow<String, String> flow) {
return flow.transform(FlowTest.this.<String>op());
}
});
final Flow<String, String> f3 = Flow.of(String.class).section(OperationAttributes.name("f3"), new Function<Flow<String, String>, Flow<String, String>>() {
@Override
public Flow<String, String> apply(Flow<String, String> flow) {
return flow.transform(FlowTest.this.<String>op());
}
});
final Source<String> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String> in2 = Source.from(Arrays.asList("d", "e", "f"));

View file

@ -33,7 +33,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
val ignore =
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
Set("create", "apply", "ops", "appendJava", "andThen") ++
Set("create", "apply", "ops", "appendJava", "andThen", "withAttributes") ++
Set("asScala", "asJava")
val allowMissing: Map[Class[_], Set[String]] = Map(

View file

@ -6,20 +6,28 @@ package akka.stream.scaladsl
import akka.testkit.TestProbe
import akka.stream.testkit.AkkaSpec
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
class FlowDispatcherSpec extends AkkaSpec {
class FlowDispatcherSpec extends AkkaSpec("my-dispatcher = ${akka.test.stream-dispatcher}") {
implicit val materializer = FlowMaterializer()
val defaultSettings = MaterializerSettings(system)
"Flow with dispatcher setting" must {
"use the specified dispatcher" in {
val probe = TestProbe()
val p = Source(List(1, 2, 3)).map(i
{ probe.ref ! Thread.currentThread().getName(); i }).
to(Sink.ignore).run()
probe.receiveN(3) foreach {
case s: String s should startWith(system.name + "-akka.test.stream-dispatcher")
}
def testDispatcher(settings: MaterializerSettings = defaultSettings, dispatcher: String = "akka.test.stream-dispatcher") = {
implicit val materializer = FlowMaterializer(settings)
val probe = TestProbe()
val p = Source(List(1, 2, 3)).map(i
{ probe.ref ! Thread.currentThread().getName(); i }).
to(Sink.ignore).run()
probe.receiveN(3) foreach {
case s: String s should startWith(system.name + "-" + dispatcher)
}
}
"Flow with dispatcher setting" must {
"use the default dispatcher" in testDispatcher()
"use custom dispatcher" in testDispatcher(defaultSettings.withDispatcher("my-dispatcher"), "my-dispatcher")
}
}

View file

@ -3,8 +3,9 @@
*/
package akka.stream.scaladsl
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlowMaterializer
import akka.stream.OverflowStrategy
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
import akka.stream.stage._
@ -28,12 +29,12 @@ class FlowGraphCompileSpec extends AkkaSpec {
val apples = () Iterator.continually(new Apple)
val f1 = Flow[String].transform("f1", op[String, String])
val f2 = Flow[String].transform("f2", op[String, String])
val f3 = Flow[String].transform("f3", op[String, String])
val f4 = Flow[String].transform("f4", op[String, String])
val f5 = Flow[String].transform("f5", op[String, String])
val f6 = Flow[String].transform("f6", op[String, String])
val f1 = Flow[String].section(name("f1"))(_.transform(op[String, String]))
val f2 = Flow[String].section(name("f2"))(_.transform(op[String, String]))
val f3 = Flow[String].section(name("f3"))(_.transform(op[String, String]))
val f4 = Flow[String].section(name("f4"))(_.transform(op[String, String]))
val f5 = Flow[String].section(name("f5"))(_.transform(op[String, String]))
val f6 = Flow[String].section(name("f6"))(_.transform(op[String, String]))
val in1 = Source(List("a", "b", "c"))
val in2 = Source(List("d", "e", "f"))
@ -164,7 +165,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
val out2 = Sink.publisher[String]
val out9 = Sink.publisher[String]
val out10 = Sink.publisher[String]
def f(s: String) = Flow[String].transform(s, op[String, String])
def f(s: String) = Flow[String].section(name(s))(_.transform(op[String, String]))
import FlowGraphImplicits._
in7 ~> f("a") ~> b7 ~> f("b") ~> m11 ~> f("c") ~> b11 ~> f("d") ~> out2

View file

@ -0,0 +1,80 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlowMaterializer
import akka.stream.testkit.AkkaSpec
import akka.actor.ActorRef
import akka.testkit.TestProbe
object FlowSectionSpec {
val config = "my-dispatcher = ${akka.test.stream-dispatcher}"
}
class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) {
implicit val mat = FlowMaterializer()
"A flow" can {
"have an op with a name" in {
val n = "Converter to Int"
val f = Flow[Int].section(name(n))(_.map(_.toInt))
f.toString should include(n)
}
"have an op with a different dispatcher" in {
val flow = Flow[Int].section(dispatcher("my-dispatcher"))(_.map(sendThreadNameTo(testActor)))
Source.singleton(1).via(flow).to(Sink.ignore).run()
receiveN(1).foreach {
case s: String s should include("my-dispatcher")
}
}
"have an op section with a name" in {
val n = "Uppercase reverser"
val f = Flow[String].
map(_.toLowerCase()).
section(name(n)) {
_.map(_.toUpperCase).
map(_.reverse)
}.
map(_.toLowerCase())
f.toString should include(n)
}
"have an op section with a different dispatcher and name" in {
val defaultDispatcher = TestProbe()
val customDispatcher = TestProbe()
val f = Flow[Int].
map(sendThreadNameTo(defaultDispatcher.ref)).
section(dispatcher("my-dispatcher") and name("separate-disptacher")) {
_.map(sendThreadNameTo(customDispatcher.ref)).
map(sendThreadNameTo(customDispatcher.ref))
}.
map(sendThreadNameTo(defaultDispatcher.ref))
Source(0 to 2).via(f).runWith(Sink.ignore)
defaultDispatcher.receiveN(6).foreach {
case s: String s should include("akka.test.stream-dispatcher")
}
customDispatcher.receiveN(6).foreach {
case s: String s should include("my-dispatcher")
}
}
def sendThreadNameTo[T](probe: ActorRef)(element: T) = {
probe ! Thread.currentThread.getName
element
}
}
}

View file

@ -59,20 +59,20 @@ object FlowSpec {
override def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): (Processor[In, Out], MaterializedMap) = {
val props = op match {
case f: Fused Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher)
case Map(f) Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage))
case Filter(p) Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage))
case Drop(n) Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage))
case Take(n) Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage))
case Collect(pf) Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage))
case Scan(z, f) Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage))
case Expand(s, f) Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage))
case Conflate(s, f) Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage))
case Buffer(n, s) Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage))
case MapConcat(f) Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage))
case o ActorProcessorFactory.props(this, o)
case f: Fused Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage))
case Map(f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage))
case Filter(p, _) Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage))
case Drop(n, _) Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage))
case Take(n, _) Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage))
case Collect(pf, _) Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage))
case Scan(z, f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage))
case Expand(s, f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage))
case Conflate(s, f, _) Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage))
case Buffer(n, s, _) Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage))
case MapConcat(f, _) Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage))
case o ActorProcessorFactory.props(this, o)
}
val impl = actorOf(props, s"$flowName-$n-${op.name}")
val impl = actorOf(props.withDispatcher(settings.dispatcher), s"$flowName-$n-${op.attributes.name}")
(ActorProcessorFactory(impl), MaterializedMap.empty)
}

View file

@ -25,7 +25,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"produce one-to-one transformation as expected" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new PushStage[Int, Int] {
transform(() new PushStage[Int, Int] {
var tot = 0
override def onPush(elem: Int, ctx: Context[Int]) = {
tot += elem
@ -48,7 +48,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"produce one-to-several transformation as expected" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new StatefulStage[Int, Int] {
transform(() new StatefulStage[Int, Int] {
var tot = 0
lazy val waitForNext = new State {
@ -85,7 +85,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"produce dropping transformation as expected" in {
val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new PushStage[Int, Int] {
transform(() new PushStage[Int, Int] {
var tot = 0
override def onPush(elem: Int, ctx: Context[Int]) = {
tot += elem
@ -111,14 +111,14 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"produce multi-step transformation as expected" in {
val p = Source(List("a", "bc", "def")).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new PushStage[String, Int] {
transform(() new PushStage[String, Int] {
var concat = ""
override def onPush(elem: String, ctx: Context[Int]) = {
concat += elem
ctx.push(concat.length)
}
}).
transform("transform", () new PushStage[Int, Int] {
transform(() new PushStage[Int, Int] {
var tot = 0
override def onPush(length: Int, ctx: Context[Int]) = {
tot += length
@ -150,7 +150,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"support emit onUpstreamFinish" in {
val p = Source(List("a")).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new StatefulStage[String, String] {
transform(() new StatefulStage[String, String] {
var s = ""
override def initial = new State {
override def onPush(element: String, ctx: Context[String]) = {
@ -173,7 +173,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"allow early finish" in {
val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Source(p).
transform("transform", () new PushStage[Int, Int] {
transform(() new PushStage[Int, Int] {
var s = ""
override def onPush(element: Int, ctx: Context[Int]) = {
s += element
@ -199,7 +199,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"report error when exception is thrown" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new StatefulStage[Int, Int] {
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) = {
if (elem == 2) {
@ -227,7 +227,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
map(elem if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem).
transform("transform", () new StatefulStage[Int, Int] {
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) = ctx.push(elem)
}
@ -253,7 +253,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"support cancel as expected" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new StatefulStage[Int, Int] {
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) =
emit(Iterator(elem, elem), ctx)
@ -275,7 +275,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"support producing elements from empty inputs" in {
val p = Source(List.empty[Int]).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new StatefulStage[Int, Int] {
transform(() new StatefulStage[Int, Int] {
override def initial = new State {
override def onPush(elem: Int, ctx: Context[Int]) = ctx.pull()
}
@ -296,7 +296,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
"support converting onComplete into onError" in {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source(List(5, 1, 2, 3)).transform("transform", () new PushStage[Int, Int] {
Source(List(5, 1, 2, 3)).transform(() new PushStage[Int, Int] {
var expectedNumberOfElements: Option[Int] = None
var count = 0
override def onPush(elem: Int, ctx: Context[Int]) =
@ -326,7 +326,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
"be safe to reuse" in {
val flow = Source(1 to 3).transform("transform", ()
val flow = Source(1 to 3).transform(()
new PushStage[Int, Int] {
var count = 0

View file

@ -19,7 +19,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
"produce scheduled ticks as expected" in {
val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Source(p).
timerTransform("timer", () new TimerTransformer[Int, Int] {
timerTransform(() new TimerTransformer[Int, Int] {
schedulePeriodically("tick", 100.millis)
var tickCount = 0
override def onNext(elem: Int) = List(elem)
@ -44,7 +44,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
"schedule ticks when last transformation step (consume)" in {
val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Source(p).
timerTransform("timer", () new TimerTransformer[Int, Int] {
timerTransform(() new TimerTransformer[Int, Int] {
schedulePeriodically("tick", 100.millis)
var tickCount = 0
override def onNext(elem: Int) = List(elem)
@ -68,7 +68,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
val exception = new Exception("Expected exception to the rule") with NoStackTrace
val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Source(p).
timerTransform("timer", () new TimerTransformer[Int, Int] {
timerTransform(() new TimerTransformer[Int, Int] {
scheduleOnce("tick", 100.millis)
def onNext(element: Int) = Nil

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import scala.concurrent.duration._
import scala.concurrent.Await
class GraphJunctionAttributesSpec extends AkkaSpec {
implicit val set = MaterializerSettings(system).withInputBuffer(4, 4)
implicit val mat = FlowMaterializer(set)
"A zip" should {
"take custom inputBuffer settings" in {
sealed abstract class SlowTick
case object SlowTick extends SlowTick
sealed abstract class FastTick
case object FastTick extends FastTick
val source = Source[(SlowTick, List[FastTick])]() { implicit b
import FlowGraphImplicits._
val slow = Source(0.seconds, 100.millis, () SlowTick)
val fast = Source(0.seconds, 10.millis, () FastTick)
val sink = UndefinedSink[(SlowTick, List[FastTick])]
val zip = Zip[SlowTick, List[FastTick]](inputBuffer(1, 1))
slow ~> zip.left
fast.conflate(tick List(tick)) { case (list, tick) tick :: list } ~> zip.right
zip.out ~> sink
sink
}
val future = source.grouped(10).runWith(Sink.head)
// FIXME #16435 drop(2) needed because first two SlowTicks get only one FastTick
Await.result(future, 2.seconds).map(_._2.size).filter(_ == 1).drop(2) should be(Nil)
}
}
}

View file

@ -3,6 +3,7 @@
*/
package akka.stream.scaladsl
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.impl.{ Optimizations, ActorBasedFlowMaterializer }
import akka.stream.testkit.AkkaSpec
@ -20,7 +21,7 @@ class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSen
val f = Source(1 to 100).
drop(4).
drop(5).
transform("identity", () FlowOps.identityStage).
section(name("identity"))(_.transform(() FlowOps.identityStage)).
filter(_ % 2 == 0).
map(_ * 2).
map(identity).

View file

@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.language.existentials
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Flow
import akka.stream.stage._
@ -22,28 +23,41 @@ private[akka] trait TimedOps {
/**
* INTERNAL API
*
* Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`.
* Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`.
*/
def timed[I, O](flow: Source[I], measuredOps: Source[I] Source[O], onComplete: FiniteDuration Unit): Source[O] = {
val ctx = new TimedFlowContext
val startWithTime = flow.transform("startTimed", () new StartTimedFlow(ctx))
val userFlow = measuredOps(startWithTime)
userFlow.transform("stopTimed", () new StopTimed(ctx, onComplete))
val startTimed = (f: Source[I]) f.transform(() new StartTimedFlow(ctx))
val stopTimed = (f: Source[O]) f.transform(() new StopTimed(ctx, onComplete))
val measured = ((s: Source[I]) s) andThen
(_.section(name("startTimed"))(startTimed)) andThen
measuredOps andThen
(_.section(name("stopTimed"))(stopTimed))
measured(flow)
}
/**
* INTERNAL API
*
* Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`.
* Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`.
*/
def timed[I, O, Out](flow: Flow[I, O], measuredOps: Flow[I, O] Flow[O, Out], onComplete: FiniteDuration Unit): Flow[O, Out] = {
// todo is there any other way to provide this for Flow, without duplicating impl? (they don't share any super-type)
// todo is there any other way to provide this for Flow, without duplicating impl?
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type
val ctx = new TimedFlowContext
val startWithTime: Flow[I, O] = flow.transform("startTimed", () new StartTimedFlow(ctx))
val userFlow: Flow[O, Out] = measuredOps(startWithTime)
userFlow.transform("stopTimed", () new StopTimed(ctx, onComplete))
val startTimed = (f: Flow[I, O]) f.transform(() new StartTimedFlow(ctx))
val stopTimed = (f: Flow[O, Out]) f.transform(() new StopTimed(ctx, onComplete))
val measured = ((f: Flow[I, O]) f) andThen
(_.section(name("startTimed"))(startTimed)) andThen
measuredOps andThen
(_.section(name("stopTimed"))(stopTimed))
measured(flow)
}
}
@ -58,18 +72,23 @@ private[akka] trait TimedIntervalBetweenOps {
import Timed._
/**
* Measures rolling interval between immediatly subsequent `matching(o: O)` elements.
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
*/
def timedIntervalBetween[O](flow: Source[O], matching: O Boolean, onInterval: FiniteDuration Unit): Source[O] = {
flow.transform("timedInterval", () new TimedIntervalTransformer[O](matching, onInterval))
flow.section(name("timedInterval")) {
_.transform(() new TimedIntervalTransformer[O](matching, onInterval))
}
}
/**
* Measures rolling interval between immediatly subsequent `matching(o: O)` elements.
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
*/
def timedIntervalBetween[I, O](flow: Flow[I, O], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[I, O] = {
// todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type)
flow.transform("timedInterval", () new TimedIntervalTransformer[O](matching, onInterval))
// todo is there any other way to provide this for Flow / Duct, without duplicating impl?
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type
flow.section(name("timedInterval")) {
_.transform(() new TimedIntervalTransformer[O](matching, onInterval))
}
}
}

View file

@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicLong
import akka.dispatch.Dispatchers
import akka.event.Logging
import akka.stream.impl.fusing.ActorInterpreter
import akka.stream.scaladsl.OperationAttributes._
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ Promise, ExecutionContext, Await, Future }
@ -25,82 +26,171 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber }
* INTERNAL API
*/
private[akka] object Ast {
sealed abstract class AstNode {
def name: String
def attributes: OperationAttributes
def withAttributes(attributes: OperationAttributes): AstNode
}
final case class TimerTransform(mkStage: () TimerTransformer[Any, Any], override val name: String) extends AstNode
object Defaults {
val timerTransform = name("timerTransform")
val stageFactory = name("stageFactory")
val fused = name("fused")
val map = name("map")
val filter = name("filter")
val collect = name("collect")
val mapAsync = name("mapAsync")
val mapAsyncUnordered = name("mapAsyncUnordered")
val grouped = name("grouped")
val take = name("take")
val drop = name("drop")
val scan = name("scan")
val buffer = name("buffer")
val conflate = name("conflate")
val expand = name("expand")
val mapConcat = name("mapConcat")
val groupBy = name("groupBy")
val prefixAndTail = name("prefixAndTail")
val splitWhen = name("splitWhen")
val concatAll = name("concatAll")
val processor = name("processor")
val processorWithKey = name("processorWithKey")
val identityOp = name("identityOp")
final case class StageFactory(mkStage: () Stage[_, _], override val name: String) extends AstNode
val merge = name("merge")
val mergePreferred = name("mergePreferred")
val broadcast = name("broadcast")
val balance = name("balance")
val zip = name("zip")
val unzip = name("unzip")
val concat = name("concat")
val flexiMerge = name("flexiMerge")
val flexiRoute = name("flexiRoute")
val identityJunction = name("identityJunction")
}
import Defaults._
final case class TimerTransform(mkStage: () TimerTransformer[Any, Any], attributes: OperationAttributes = timerTransform) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class StageFactory(mkStage: () Stage[_, _], attributes: OperationAttributes = stageFactory) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
object Fused {
def apply(ops: immutable.Seq[Stage[_, _]]): Fused =
Fused(ops, ops.map(x Logging.simpleName(x).toLowerCase).mkString("+")) //FIXME change to something more performant for name
Fused(ops, name(ops.map(x Logging.simpleName(x).toLowerCase).mkString("+"))) //FIXME change to something more performant for name
}
final case class Fused(ops: immutable.Seq[Stage[_, _]], attributes: OperationAttributes = fused) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Fused(ops: immutable.Seq[Stage[_, _]], override val name: String) extends AstNode
final case class Map(f: Any Any) extends AstNode { override def name = "map" }
final case class Map(f: Any Any, attributes: OperationAttributes = map) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Filter(p: Any Boolean) extends AstNode { override def name = "filter" }
final case class Filter(p: Any Boolean, attributes: OperationAttributes = filter) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Collect(pf: PartialFunction[Any, Any]) extends AstNode { override def name = "collect" }
final case class Collect(pf: PartialFunction[Any, Any], attributes: OperationAttributes = collect) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
// FIXME Replace with OperateAsync
final case class MapAsync(f: Any Future[Any]) extends AstNode { override def name = "mapAsync" }
final case class MapAsync(f: Any Future[Any], attributes: OperationAttributes = mapAsync) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
//FIXME Should be OperateUnorderedAsync
final case class MapAsyncUnordered(f: Any Future[Any]) extends AstNode { override def name = "mapAsyncUnordered" }
final case class MapAsyncUnordered(f: Any Future[Any], attributes: OperationAttributes = mapAsyncUnordered) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Grouped(n: Int) extends AstNode {
final case class Grouped(n: Int, attributes: OperationAttributes = grouped) extends AstNode {
require(n > 0, "n must be greater than 0")
override def name = "grouped"
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
//FIXME should be `n: Long`
final case class Take(n: Int) extends AstNode {
override def name = "take"
final case class Take(n: Int, attributes: OperationAttributes = take) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
//FIXME should be `n: Long`
final case class Drop(n: Int) extends AstNode {
override def name = "drop"
final case class Drop(n: Int, attributes: OperationAttributes = drop) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Scan(zero: Any, f: (Any, Any) Any) extends AstNode { override def name = "scan" }
final case class Scan(zero: Any, f: (Any, Any) Any, attributes: OperationAttributes = scan) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode {
final case class Buffer(size: Int, overflowStrategy: OverflowStrategy, attributes: OperationAttributes = buffer) extends AstNode {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
override def name = "buffer"
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Conflate(seed: Any Any, aggregate: (Any, Any) Any) extends AstNode {
override def name = "conflate"
final case class Conflate(seed: Any Any, aggregate: (Any, Any) Any, attributes: OperationAttributes = conflate) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class Expand(seed: Any Any, extrapolate: Any (Any, Any)) extends AstNode {
override def name = "expand"
final case class Expand(seed: Any Any, extrapolate: Any (Any, Any), attributes: OperationAttributes = expand) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class MapConcat(f: Any immutable.Seq[Any]) extends AstNode {
override def name = "mapConcat"
final case class MapConcat(f: Any immutable.Seq[Any], attributes: OperationAttributes = mapConcat) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class GroupBy(f: Any Any) extends AstNode { override def name = "groupBy" }
final case class PrefixAndTail(n: Int) extends AstNode { override def name = "prefixAndTail" }
final case class SplitWhen(p: Any Boolean) extends AstNode { override def name = "splitWhen" }
final case object ConcatAll extends AstNode {
override def name = "concatFlatten"
final case class GroupBy(f: Any Any, attributes: OperationAttributes = groupBy) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
case class DirectProcessor(p: () Processor[Any, Any]) extends AstNode {
override def name = "processor"
final case class PrefixAndTail(n: Int, attributes: OperationAttributes = prefixAndTail) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
case class DirectProcessorWithKey(p: () (Processor[Any, Any], Any), key: Key) extends AstNode {
override def name = "processorWithKey"
final case class SplitWhen(p: Any Boolean, attributes: OperationAttributes = splitWhen) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
final case class ConcatAll(attributes: OperationAttributes = concatAll) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
case class DirectProcessor(p: () Processor[Any, Any], attributes: OperationAttributes = processor) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
case class DirectProcessorWithKey(p: () (Processor[Any, Any], Any), key: Key, attributes: OperationAttributes = processorWithKey) extends AstNode {
def withAttributes(attributes: OperationAttributes) =
copy(attributes = attributes)
}
sealed trait JunctionAstNode {
def name: String
def attributes: OperationAttributes
}
// FIXME: Try to eliminate these
@ -108,46 +198,21 @@ private[akka] object Ast {
sealed trait FanOutAstNode extends JunctionAstNode
// FIXME Why do we need this?
case class IdentityAstNode(id: Int) extends JunctionAstNode {
override val name = s"identity$id"
}
case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode
case object Merge extends FanInAstNode {
override def name = "merge"
}
final case class Merge(attributes: OperationAttributes) extends FanInAstNode
final case class MergePreferred(attributes: OperationAttributes) extends FanInAstNode
case object MergePreferred extends FanInAstNode {
override def name = "mergePreferred"
}
final case class Broadcast(attributes: OperationAttributes) extends FanOutAstNode
final case class Balance(waitForAllDownstreams: Boolean, attributes: OperationAttributes) extends FanOutAstNode
case object Broadcast extends FanOutAstNode {
override def name = "broadcast"
}
final case class Zip(as: ZipAs, attributes: OperationAttributes) extends FanInAstNode
final case class Unzip(attributes: OperationAttributes) extends FanOutAstNode
case class Balance(waitForAllDownstreams: Boolean) extends FanOutAstNode {
override def name = "balance"
}
final case class Zip(as: ZipAs) extends FanInAstNode {
override def name = "zip"
}
case object Unzip extends FanOutAstNode {
override def name = "unzip"
}
case object Concat extends FanInAstNode {
override def name = "concat"
}
case class FlexiMergeNode(factory: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode {
override def name = factory.name.getOrElse("flexiMerge")
}
case class FlexiRouteNode(factory: FlexiRouteImpl.RouteLogicFactory[Any]) extends FanOutAstNode {
override def name = factory.name.getOrElse("flexiRoute")
}
final case class Concat(attributes: OperationAttributes) extends FanInAstNode
final case class FlexiMergeNode(factory: FlexiMergeImpl.MergeLogicFactory[Any], attributes: OperationAttributes) extends FanInAstNode
final case class FlexiRouteNode(factory: FlexiRouteImpl.RouteLogicFactory[Any], attributes: OperationAttributes) extends FanOutAstNode
}
/**
@ -216,12 +281,12 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
rest match {
case noMatch if !optimizations.elision || (noMatch ne orig) orig
//Collapses consecutive Take's into one
case (t1 @ Ast.Take(t1n)) :: (t2 @ Ast.Take(t2n)) :: rest (if (t1n < t2n) t1 else t2) :: rest
case (t1: Ast.Take) :: (t2: Ast.Take) :: rest (if (t1.n < t2.n) t1 else t2) :: rest
//Collapses consecutive Drop's into one
case (d1 @ Ast.Drop(d1n)) :: (d2 @ Ast.Drop(d2n)) :: rest new Ast.Drop(d1n + d2n) :: rest
case (d1: Ast.Drop) :: (d2: Ast.Drop) :: rest new Ast.Drop(d1.n + d2.n, d1.attributes and d2.attributes) :: rest
case Ast.Drop(n) :: rest if n < 1 rest // a 0 or negative drop is a NoOp
case Ast.Drop(n, _) :: rest if n < 1 rest // a 0 or negative drop is a NoOp
case noMatch noMatch
}
@ -231,7 +296,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
case noMatch if !optimizations.simplification || (noMatch ne orig) orig
// Two consecutive maps is equivalent to one pipelined map
case Ast.Map(second) :: Ast.Map(first) :: rest Ast.Map(first andThen second) :: rest
case (second: Ast.Map) :: (first: Ast.Map) :: rest Ast.Map(first.f andThen second.f, first.attributes and second.attributes) :: rest
case noMatch noMatch
}
@ -242,7 +307,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
case noMatch if !optimizations.collapsing || (noMatch ne orig) orig
// Collapses a filter and a map into a collect
case Ast.Map(f) :: Ast.Filter(p) :: rest Ast.Collect({ case i if p(i) f(i) }) :: rest
case (map: Ast.Map) :: (fil: Ast.Filter) :: rest Ast.Collect({ case i if fil.p(i) map.f(i) }) :: rest
case noMatch noMatch
}
@ -256,17 +321,17 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
// Optimizations below
case noMatch if !optimizations.fusion prev
case Ast.Map(f) fusing.Map(f) :: prev
case Ast.Filter(p) fusing.Filter(p) :: prev
case Ast.Drop(n) fusing.Drop(n) :: prev
case Ast.Take(n) fusing.Take(n) :: prev
case Ast.Collect(pf) fusing.Collect(pf) :: prev
case Ast.Scan(z, f) fusing.Scan(z, f) :: prev
case Ast.Expand(s, f) fusing.Expand(s, f) :: prev
case Ast.Conflate(s, f) fusing.Conflate(s, f) :: prev
case Ast.Buffer(n, s) fusing.Buffer(n, s) :: prev
case Ast.MapConcat(f) fusing.MapConcat(f) :: prev
case Ast.Grouped(n) fusing.Grouped(n) :: prev
case Ast.Map(f, _) fusing.Map(f) :: prev
case Ast.Filter(p, _) fusing.Filter(p) :: prev
case Ast.Drop(n, _) fusing.Drop(n) :: prev
case Ast.Take(n, _) fusing.Take(n) :: prev
case Ast.Collect(pf, _) fusing.Collect(pf) :: prev
case Ast.Scan(z, f, _) fusing.Scan(z, f) :: prev
case Ast.Expand(s, f, _) fusing.Expand(s, f) :: prev
case Ast.Conflate(s, f, _) fusing.Conflate(s, f) :: prev
case Ast.Buffer(n, s, _) fusing.Buffer(n, s) :: prev
case Ast.MapConcat(f, _) fusing.MapConcat(f) :: prev
case Ast.Grouped(n, _) fusing.Grouped(n) :: prev
//FIXME Add more fusion goodies here
case _ prev
}
@ -354,7 +419,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
}
}
//FIXME Should this be a dedicated AstNode?
private[this] val identityStageNode = Ast.StageFactory(() FlowOps.identityStage[Any], "identity")
private[this] val identityStageNode = Ast.StageFactory(() FlowOps.identityStage[Any], Ast.Defaults.identityOp)
def executionContext: ExecutionContext = dispatchers.lookup(settings.dispatcher match {
case Deploy.NoDispatcherGiven Dispatchers.DefaultDispatcherId
@ -366,23 +431,32 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
*/
private[akka] def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): (Processor[In, Out], MaterializedMap) = op match {
// FIXME #16376 should probably be replaced with an ActorFlowProcessor similar to ActorFlowSource/Sink
case Ast.DirectProcessor(p) (p().asInstanceOf[Processor[In, Out]], MaterializedMap.empty)
case Ast.DirectProcessorWithKey(p, key)
case Ast.DirectProcessor(p, _) (p().asInstanceOf[Processor[In, Out]], MaterializedMap.empty)
case Ast.DirectProcessorWithKey(p, key, _)
val (processor, value) = p()
(processor.asInstanceOf[Processor[In, Out]], MaterializedMap.empty.updated(key, value))
case _
(ActorProcessorFactory[In, Out](actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}")), MaterializedMap.empty)
(ActorProcessorFactory[In, Out](actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.attributes.name}", op)), MaterializedMap.empty)
}
def actorOf(props: Props, name: String): ActorRef = supervisor match {
private[akka] def actorOf(props: Props, name: String): ActorRef =
actorOf(props, name, settings.dispatcher)
private[akka] def actorOf(props: Props, name: String, ast: Ast.JunctionAstNode): ActorRef =
actorOf(props, name, ast.attributes.settings(settings).dispatcher)
private[akka] def actorOf(props: Props, name: String, ast: AstNode): ActorRef =
actorOf(props, name, ast.attributes.settings(settings).dispatcher)
private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = supervisor match {
case ref: LocalActorRef
ref.underlying.attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false)
ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false)
case ref: RepointableActorRef
if (ref.isStarted)
ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false)
ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), name, systemService = false)
else {
implicit val timeout = ref.system.settings.CreationTimeout
val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(settings.dispatcher), name)).mapTo[ActorRef]
val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), name)).mapTo[ActorRef]
Await.result(f, timeout.duration)
}
case unknown
@ -390,17 +464,20 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
}
// FIXME Investigate possibility of using `enableOperationsFusion` in `materializeJunction`
override def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = {
val actorName = s"${createFlowName()}-${op.name}"
val actorName = s"${createFlowName()}-${op.attributes.name}"
val transformedSettings = op.attributes.settings(settings)
op match {
case fanin: Ast.FanInAstNode
val impl = fanin match {
case Ast.Merge actorOf(FairMerge.props(settings, inputCount), actorName)
case Ast.MergePreferred actorOf(UnfairMerge.props(settings, inputCount), actorName)
case zip: Ast.Zip actorOf(Zip.props(settings, zip.as), actorName)
case Ast.Concat actorOf(Concat.props(settings), actorName)
case Ast.FlexiMergeNode(merger) actorOf(FlexiMergeImpl.props(settings, inputCount, merger.createMergeLogic()), actorName)
val props = fanin match {
case Ast.Merge(_) FairMerge.props(transformedSettings, inputCount)
case Ast.MergePreferred(_) UnfairMerge.props(transformedSettings, inputCount)
case Ast.Zip(as, _) Zip.props(transformedSettings, as)
case Ast.Concat(_) Concat.props(transformedSettings)
case Ast.FlexiMergeNode(merger, _) FlexiMergeImpl.props(transformedSettings, inputCount, merger.createMergeLogic())
}
val impl = actorOf(props, actorName, fanin)
val publisher = new ActorPublisher[Out](impl)
impl ! ExposedPublisher(publisher.asInstanceOf[ActorPublisher[Any]])
@ -408,12 +485,13 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
(subscribers, List(publisher))
case fanout: Ast.FanOutAstNode
val impl = fanout match {
case Ast.Broadcast actorOf(Broadcast.props(settings, outputCount), actorName)
case Ast.Balance(waitForAllDownstreams) actorOf(Balance.props(settings, outputCount, waitForAllDownstreams), actorName)
case Ast.Unzip actorOf(Unzip.props(settings), actorName)
case Ast.FlexiRouteNode(route) actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()), actorName)
val props = fanout match {
case Ast.Broadcast(_) Broadcast.props(transformedSettings, outputCount)
case Ast.Balance(waitForAllDownstreams, _) Balance.props(transformedSettings, outputCount, waitForAllDownstreams)
case Ast.Unzip(_) Unzip.props(transformedSettings)
case Ast.FlexiRouteNode(route, _) FlexiRouteImpl.props(transformedSettings, outputCount, route.createRouteLogic())
}
val impl = actorOf(props, actorName, fanout)
val publishers = Vector.tabulate(outputCount)(id new ActorPublisher[Out](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
@ -422,9 +500,9 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
val subscriber = ActorSubscriber[In](impl)
(List(subscriber), publishers)
case identity @ Ast.IdentityAstNode(_) // FIXME Why is IdentityAstNode a JunctionAStNode?
case identity @ Ast.IdentityAstNode(attr) // FIXME Why is IdentityAstNode a JunctionAStNode?
// We can safely ignore the materialized map that gets created here since it will be empty
val id = List(processorForNode[In, Out](identityStageNode, identity.name, 1)._1) // FIXME is `identity.name` appropriate/unique here?
val id = List(processorForNode[In, Out](identityStageNode, attr.name, 1)._1) // FIXME is `identity.name` appropriate/unique here?
(id, id)
}
@ -477,28 +555,28 @@ private[akka] object ActorProcessorFactory {
import Ast._
def props(materializer: FlowMaterializer, op: AstNode): Props = {
val settings = materializer.settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
(op match {
op match {
case Fused(ops, _) ActorInterpreter.props(settings, ops)
case Map(f) ActorInterpreter.props(settings, List(fusing.Map(f)))
case Filter(p) ActorInterpreter.props(settings, List(fusing.Filter(p)))
case Drop(n) ActorInterpreter.props(settings, List(fusing.Drop(n)))
case Take(n) ActorInterpreter.props(settings, List(fusing.Take(n)))
case Collect(pf) ActorInterpreter.props(settings, List(fusing.Collect(pf)))
case Scan(z, f) ActorInterpreter.props(settings, List(fusing.Scan(z, f)))
case Expand(s, f) ActorInterpreter.props(settings, List(fusing.Expand(s, f)))
case Conflate(s, f) ActorInterpreter.props(settings, List(fusing.Conflate(s, f)))
case Buffer(n, s) ActorInterpreter.props(settings, List(fusing.Buffer(n, s)))
case MapConcat(f) ActorInterpreter.props(settings, List(fusing.MapConcat(f)))
case MapAsync(f) MapAsyncProcessorImpl.props(settings, f)
case MapAsyncUnordered(f) MapAsyncUnorderedProcessorImpl.props(settings, f)
case Grouped(n) ActorInterpreter.props(settings, List(fusing.Grouped(n)))
case GroupBy(f) GroupByProcessorImpl.props(settings, f)
case PrefixAndTail(n) PrefixAndTailImpl.props(settings, n)
case SplitWhen(p) SplitWhenProcessorImpl.props(settings, p)
case ConcatAll ConcatAllImpl.props(materializer) //FIXME closes over the materializer, is this good?
case Map(f, _) ActorInterpreter.props(settings, List(fusing.Map(f)))
case Filter(p, _) ActorInterpreter.props(settings, List(fusing.Filter(p)))
case Drop(n, _) ActorInterpreter.props(settings, List(fusing.Drop(n)))
case Take(n, _) ActorInterpreter.props(settings, List(fusing.Take(n)))
case Collect(pf, _) ActorInterpreter.props(settings, List(fusing.Collect(pf)))
case Scan(z, f, _) ActorInterpreter.props(settings, List(fusing.Scan(z, f)))
case Expand(s, f, _) ActorInterpreter.props(settings, List(fusing.Expand(s, f)))
case Conflate(s, f, _) ActorInterpreter.props(settings, List(fusing.Conflate(s, f)))
case Buffer(n, s, _) ActorInterpreter.props(settings, List(fusing.Buffer(n, s)))
case MapConcat(f, _) ActorInterpreter.props(settings, List(fusing.MapConcat(f)))
case MapAsync(f, _) MapAsyncProcessorImpl.props(settings, f)
case MapAsyncUnordered(f, _) MapAsyncUnorderedProcessorImpl.props(settings, f)
case Grouped(n, _) ActorInterpreter.props(settings, List(fusing.Grouped(n)))
case GroupBy(f, _) GroupByProcessorImpl.props(settings, f)
case PrefixAndTail(n, _) PrefixAndTailImpl.props(settings, n)
case SplitWhen(p, _) SplitWhenProcessorImpl.props(settings, p)
case ConcatAll(_) ConcatAllImpl.props(materializer) //FIXME closes over the materializer, is this good?
case StageFactory(mkStage, _) ActorInterpreter.props(settings, List(mkStage()))
case TimerTransform(mkStage, _) TimerTransformerProcessorsImpl.props(settings, mkStage())
}).withDispatcher(settings.dispatcher)
}
}
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {

View file

@ -5,6 +5,7 @@ package akka.stream.impl
import akka.actor.Props
import akka.stream.MaterializerSettings
import akka.stream.scaladsl.OperationAttributes
import akka.stream.scaladsl.FlexiMerge
import scala.collection.breakOut
@ -17,7 +18,7 @@ private[akka] object FlexiMergeImpl {
Props(new FlexiMergeImpl(settings, inputCount, mergeLogic))
trait MergeLogicFactory[Out] {
def name: Option[String]
def attributes: OperationAttributes
def createMergeLogic(): FlexiMerge.MergeLogic[Out]
}
}

View file

@ -3,6 +3,8 @@
*/
package akka.stream.impl
import akka.stream.scaladsl.OperationAttributes
import scala.collection.breakOut
import akka.actor.Props
import akka.stream.scaladsl.FlexiRoute
@ -17,7 +19,7 @@ private[akka] object FlexiRouteImpl {
Props(new FlexiRouteImpl(settings, outputCount, routeLogic))
trait RouteLogicFactory[In] {
def name: Option[String]
def attributes: OperationAttributes
def createRouteLogic(): FlexiRoute.RouteLogic[In]
}
}

View file

@ -9,6 +9,7 @@ import akka.stream.scaladsl.FlexiMerge.ReadAllInputsBase
import scala.collection.immutable
import java.util.{ List JList }
import akka.japi.Util.immutableIndexedSeq
import akka.stream.impl.Ast.Defaults._
import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory
object FlexiMerge {
@ -337,20 +338,20 @@ object FlexiMerge {
*
* @param name optional name of the junction in the [[FlowGraph]],
*/
abstract class FlexiMerge[In, Out](val name: Option[String]) {
abstract class FlexiMerge[In, Out](val attributes: OperationAttributes) {
import FlexiMerge._
import scaladsl.FlowGraphInternal
import akka.stream.impl.Ast
def this() = this(None)
def this(name: String) = this(Option(name))
def this(name: String) = this(OperationAttributes.name(name))
def this() = this(OperationAttributes.none)
private var inputCount = 0
def createMergeLogic(): MergeLogic[In, Out]
// hide the internal vertex things from subclass, and make it possible to create new instance
private class FlexiMergeVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex {
private class FlexiMergeVertex(override val attributes: scaladsl.OperationAttributes) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount = 2
override def maximumInputCount = inputCount
override def minimumOutputCount = 1
@ -358,22 +359,20 @@ abstract class FlexiMerge[In, Out](val name: Option[String]) {
override private[akka] val astNode = {
val factory = new MergeLogicFactory[Any] {
override def name: Option[String] = vertexName
override def attributes: scaladsl.OperationAttributes = FlexiMergeVertex.this.attributes
override def createMergeLogic(): scaladsl.FlexiMerge.MergeLogic[Any] =
new Internal.MergeLogicWrapper(FlexiMerge.this.createMergeLogic().asInstanceOf[MergeLogic[Any, Any]])
}
Ast.FlexiMergeNode(factory)
Ast.FlexiMergeNode(factory, flexiMerge and attributes)
}
override def name = vertexName
final override def newInstance() = new FlexiMergeVertex(None)
final override def newInstance() = new FlexiMergeVertex(attributes.withoutName)
}
/**
* INTERNAL API
*/
private[akka] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(name)
private[akka] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(attributes.asScala)
/**
* Output port of the `FlexiMerge` junction. A [[Sink]] can be connected to this output
@ -399,4 +398,8 @@ abstract class FlexiMerge[In, Out](val name: Option[String]) {
new InputPort(port, parent = this)
}
override def toString = attributes.asScala.nameLifted match {
case Some(n) n
case None getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode())
}
}

View file

@ -8,6 +8,7 @@ import akka.stream.scaladsl
import scala.collection.immutable
import java.util.{ List JList }
import akka.japi.Util.immutableIndexedSeq
import akka.stream.impl.Ast.Defaults._
import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory
object FlexiRoute {
@ -304,18 +305,18 @@ object FlexiRoute {
*
* @param name optional name of the junction in the [[FlowGraph]],
*/
abstract class FlexiRoute[In, Out](val name: Option[String]) {
abstract class FlexiRoute[In, Out](val attributes: OperationAttributes) {
import FlexiRoute._
import scaladsl.FlowGraphInternal
import akka.stream.impl.Ast
def this() = this(None)
def this(name: String) = this(Option(name))
def this(name: String) = this(OperationAttributes.name(name))
def this() = this(OperationAttributes.none)
private var outputCount = 0
// hide the internal vertex things from subclass, and make it possible to create new instance
private class RouteVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex {
private class RouteVertex(override val attributes: scaladsl.OperationAttributes) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount = 1
override def maximumInputCount = 1
override def minimumOutputCount = 2
@ -323,22 +324,20 @@ abstract class FlexiRoute[In, Out](val name: Option[String]) {
override private[akka] val astNode = {
val factory = new RouteLogicFactory[Any] {
override def name: Option[String] = vertexName
override def attributes: scaladsl.OperationAttributes = RouteVertex.this.attributes
override def createRouteLogic(): scaladsl.FlexiRoute.RouteLogic[Any] =
new Internal.RouteLogicWrapper(FlexiRoute.this.createRouteLogic().asInstanceOf[RouteLogic[Any, Any]])
}
Ast.FlexiRouteNode(factory)
Ast.FlexiRouteNode(factory, flexiRoute and attributes)
}
override def name = vertexName
final override def newInstance() = new RouteVertex(None)
final override def newInstance() = new RouteVertex(attributes.withoutName)
}
/**
* INTERNAL API
*/
private[akka] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(name)
private[akka] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(attributes.asScala)
/**
* Input port of the `FlexiRoute` junction. A [[Source]] can be connected to this output
@ -371,7 +370,7 @@ abstract class FlexiRoute[In, Out](val name: Option[String]) {
*/
def createRouteLogic(): RouteLogic[In, Out]
override def toString = name match {
override def toString = attributes.asScala.nameLifted match {
case Some(n) n
case None getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode())
}

View file

@ -301,8 +301,8 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
def transform[U](name: String, mkStage: japi.Creator[Stage[Out, U]]): javadsl.Flow[In, U] =
new Flow(delegate.transform(name, () mkStage.create()))
def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Flow[In, U] =
new Flow(delegate.transform(() mkStage.create()))
/**
* Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element
@ -363,6 +363,16 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
*/
def withKey[T](key: javadsl.Key[T]): Flow[In, Out] =
new Flow(delegate.withKey(key.asScala))
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[I <: In, O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[In, Out], javadsl.Flow[I, O]]): javadsl.Flow[I, O] =
new Flow(delegate.section(attributes.asScala) {
val scalaToJava = (flow: scaladsl.Flow[In, Out]) new javadsl.Flow[In, Out](flow)
val javaToScala = (flow: javadsl.Flow[I, O]) flow.asScala
scalaToJava andThen section.apply andThen javaToScala
})
}
/**

View file

@ -56,7 +56,7 @@ object Merge {
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def create[T](name: String): Merge[T] = new Merge(new scaladsl.Merge[T](Option(name)))
def create[T](name: String): Merge[T] = new Merge(new scaladsl.Merge[T](OperationAttributes.name(name).asScala))
/**
* Create a named `Merge` vertex with the specified output type.
@ -101,7 +101,7 @@ object MergePreferred {
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def create[T](name: String): MergePreferred[T] = new MergePreferred(new scaladsl.MergePreferred[T](Option(name)))
def create[T](name: String): MergePreferred[T] = new MergePreferred(new scaladsl.MergePreferred[T](OperationAttributes.name(name).asScala))
/**
* Create a named `MergePreferred` vertex with the specified output type.
@ -146,7 +146,7 @@ object Broadcast {
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def create[T](name: String): Broadcast[T] = new Broadcast(new scaladsl.Broadcast(Option(name)))
def create[T](name: String): Broadcast[T] = new Broadcast(new scaladsl.Broadcast(OperationAttributes.name(name).asScala))
/**
* Create a named `Broadcast` vertex with the specified input type.
@ -190,7 +190,7 @@ object Balance {
* returns instances that are `equal`.
*/
def create[T](name: String): Balance[T] =
new Balance(new scaladsl.Balance(Option(name), waitForAllDownstreams = false))
new Balance(new scaladsl.Balance(waitForAllDownstreams = false, OperationAttributes.name(name).asScala))
/**
* Create a named `Balance` vertex with the specified input type.
@ -214,7 +214,7 @@ class Balance[T](delegate: scaladsl.Balance[T]) extends javadsl.Junction[T] {
* elements to downstream outputs until all of them have requested at least one element.
*/
def withWaitForAllDowstreams(enabled: Boolean): Balance[T] =
new Balance(new scaladsl.Balance(delegate.name, delegate.waitForAllDownstreams))
new Balance(new scaladsl.Balance(delegate.waitForAllDownstreams, delegate.attributes))
}
object Zip {
@ -242,8 +242,8 @@ object Zip {
* is called and those instances are not `equal`.*
*/
def create[A, B](name: String): Zip[A, B] =
new Zip(new scaladsl.Zip[A, B](Option(name)) {
override private[akka] def astNode: Ast.FanInAstNode = Ast.Zip(impl.Zip.AsJavaPair)
new Zip(new scaladsl.Zip[A, B](OperationAttributes.name(name).asScala) {
override private[akka] def astNode: Ast.FanInAstNode = Ast.Zip(impl.Zip.AsJavaPair, attributes)
})
/**
@ -287,7 +287,7 @@ object Unzip {
def create[A, B](): Unzip[A, B] = create(name = null)
def create[A, B](name: String): Unzip[A, B] =
new Unzip[A, B](new scaladsl.Unzip[A, B](Option(name)))
new Unzip[A, B](new scaladsl.Unzip[A, B](OperationAttributes.name(name).asScala))
def create[A, B](left: Class[A], right: Class[B]): Unzip[A, B] =
create[A, B]()
@ -389,7 +389,7 @@ object UndefinedSource {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def create[T](): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](None))
def create[T](): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](scaladsl.OperationAttributes.none))
/**
* Create a new anonymous `Undefinedsource` vertex with the specified input type.
@ -405,7 +405,7 @@ object UndefinedSource {
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def create[T](name: String): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](Option(name)))
def create[T](name: String): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](OperationAttributes.name(name).asScala))
/**
* Create a named `Undefinedsource` vertex with the specified input type.
@ -448,7 +448,7 @@ object UndefinedSink {
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def create[T](name: String): UndefinedSink[T] = new UndefinedSink[T](new scaladsl.UndefinedSink[T](Option(name)))
def create[T](name: String): UndefinedSink[T] = new UndefinedSink[T](new scaladsl.UndefinedSink[T](OperationAttributes.name(name).asScala))
/**
* Create a named `Undefinedsink` vertex with the specified input type.

View file

@ -0,0 +1,60 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl
import akka.stream.scaladsl
/**
* Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]]
* materialization.
*/
abstract class OperationAttributes private () {
private[akka] def asScala: scaladsl.OperationAttributes
/**
* Adds given attributes to the end of these attributes.
*/
def and(other: OperationAttributes) = new OperationAttributes {
private[akka] def asScala = this.asScala and other.asScala
}
}
/**
* Various attributes that can be applied to [[Flow]] or [[FlowGraph]]
* materialization.
*/
object OperationAttributes {
/**
* Specifies the name of the operation.
*/
def name(name: String): OperationAttributes = new OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.name(name)
}
/**
* Specifies the initial and maximum size of the input buffer.
*/
def inputBuffer(initial: Int, max: Int): OperationAttributes = new OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.inputBuffer(initial, max)
}
/**
* Specifies the initial and maximum size of the fan out buffer.
*/
def fanOutBuffer(initial: Int, max: Int): OperationAttributes = new OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.fanOutBuffer(initial, max)
}
/**
* Specifies the name of the dispatcher.
*/
def dispatcher(dispatcher: String): OperationAttributes = new OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.dispatcher(dispatcher)
}
private[akka] val none: OperationAttributes = new OperationAttributes {
private[akka] def asScala = scaladsl.OperationAttributes.none
}
}

View file

@ -390,8 +390,8 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
def transform[U](name: String, mkStage: japi.Creator[Stage[Out, U]]): javadsl.Source[U] =
new Source(delegate.transform(name, () mkStage.create()))
def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Source[U] =
new Source(delegate.transform(() mkStage.create()))
/**
* Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element
@ -445,6 +445,16 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
*/
def withKey[T](key: javadsl.Key[T]): javadsl.Source[Out] =
new Source(delegate.withKey(key.asScala))
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Source[Out], javadsl.Source[O]]): javadsl.Source[O] =
new Source(delegate.section(attributes.asScala) {
val scalaToJava = (source: scaladsl.Source[Out]) new javadsl.Source[Out](source)
val javaToScala = (source: javadsl.Source[O]) source.asScala
scalaToJava andThen section.apply andThen javaToScala
})
}
/**

View file

@ -10,9 +10,10 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.impl.{ ActorBasedFlowMaterializer, ActorProcessorFactory, FanoutProcessorImpl, BlackholeSubscriber }
import java.util.concurrent.atomic.AtomicReference
import akka.stream.stage._
import java.util.concurrent.atomic.AtomicReference
sealed trait ActorFlowSink[-In] extends Sink[In] {
@ -170,8 +171,8 @@ object OnCompleteSink {
*/
final case class OnCompleteSink[In](callback: Try[Unit] Unit) extends SimpleActorFlowSink[In] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) =
Source(flowPublisher).transform("onCompleteSink", () new PushStage[In, Unit] {
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val section = (s: Source[In]) s.transform(() new PushStage[In, Unit] {
override def onPush(elem: In, ctx: Context[Unit]): Directive = ctx.pull()
override def onUpstreamFailure(cause: Throwable, ctx: Context[Unit]): TerminationDirective = {
callback(Failure(cause))
@ -181,7 +182,13 @@ final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends Simple
callback(OnCompleteSink.SuccessUnit)
ctx.finish()
}
}).to(BlackholeSink).run()(materializer.withNamePrefix(flowName))
})
Source(flowPublisher).
section(name("onCompleteSink"))(section).
to(BlackholeSink).
run()(materializer.withNamePrefix(flowName))
}
}
/**
@ -195,7 +202,7 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In]
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val promise = Promise[Unit]()
Source(flowPublisher).transform("foreach", () new PushStage[In, Unit] {
val section = (s: Source[In]) s.transform(() new PushStage[In, Unit] {
override def onPush(elem: In, ctx: Context[Unit]): Directive = {
f(elem)
ctx.pull()
@ -208,7 +215,12 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In]
promise.success(())
ctx.finish()
}
}).to(BlackholeSink).run()(materializer.withNamePrefix(flowName))
})
Source(flowPublisher).
section(name("foreach"))(section).
to(BlackholeSink).
run()(materializer.withNamePrefix(flowName))
promise.future
}
}
@ -226,8 +238,7 @@ final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFl
override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = {
val promise = Promise[U]()
Source(flowPublisher).transform("fold", () new PushStage[In, U] {
val section = (s: Source[In]) s.transform(() new PushStage[In, U] {
private var aggregator = zero
override def onPush(elem: In, ctx: Context[U]): Directive = {
@ -244,8 +255,12 @@ final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFl
promise.success(aggregator)
ctx.finish()
}
}).to(BlackholeSink).run()(materializer.withNamePrefix(flowName))
})
Source(flowPublisher).
section(name("fold"))(section).
to(BlackholeSink).
run()(materializer.withNamePrefix(flowName))
promise.future
}
}

View file

@ -62,6 +62,8 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] {
/** INTERNAL API */
override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op), Nil) //FIXME raw addition of AstNodes
def withAttributes(attr: OperationAttributes) = SourcePipe(this, Nil, Nil, attr)
}
/**

View file

@ -5,7 +5,9 @@ package akka.stream.scaladsl
import scala.annotation.varargs
import scala.collection.immutable
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.impl.Ast
import akka.stream.impl.Ast.Defaults._
import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory
object FlexiMerge {
@ -235,28 +237,27 @@ object FlexiMerge {
*
* @param name optional name of the junction in the [[FlowGraph]],
*/
abstract class FlexiMerge[Out](val name: Option[String]) extends MergeLogicFactory[Out] {
abstract class FlexiMerge[Out](override val attributes: OperationAttributes) extends MergeLogicFactory[Out] {
import FlexiMerge._
def this(name: String) = this(Some(name))
def this() = this(None)
def this(name: String) = this(OperationAttributes.name(name))
def this() = this(OperationAttributes.none)
private var inputCount = 0
// hide the internal vertex things from subclass, and make it possible to create new instance
private class FlexiMergeVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex {
private class FlexiMergeVertex(override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount = 2
override def maximumInputCount = inputCount
override def minimumOutputCount = 1
override def maximumOutputCount = 1
override private[akka] val astNode = Ast.FlexiMergeNode(FlexiMerge.this.asInstanceOf[FlexiMerge[Any]])
override def name = vertexName
override private[akka] val astNode = Ast.FlexiMergeNode(FlexiMerge.this.asInstanceOf[FlexiMerge[Any]], flexiMerge and attributes)
final override private[scaladsl] def newInstance() = new FlexiMergeVertex(None)
final override private[scaladsl] def newInstance() = new FlexiMergeVertex(attributes.withoutName)
}
private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(name)
private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(attributes)
/**
* Output port of the `FlexiMerge` junction. A [[Sink]] can be connected to this output
@ -286,7 +287,7 @@ abstract class FlexiMerge[Out](val name: Option[String]) extends MergeLogicFacto
*/
override def createMergeLogic(): MergeLogic[Out]
override def toString = name match {
override def toString = attributes.nameLifted match {
case Some(n) n
case None getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode())
}

View file

@ -4,7 +4,9 @@
package akka.stream.scaladsl
import scala.collection.immutable
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.impl.Ast
import akka.stream.impl.Ast.Defaults._
import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory
object FlexiRoute {
@ -211,28 +213,27 @@ object FlexiRoute {
*
* @param name optional name of the junction in the [[FlowGraph]],
*/
abstract class FlexiRoute[In](val name: Option[String]) extends RouteLogicFactory[In] {
abstract class FlexiRoute[In](override val attributes: OperationAttributes) extends RouteLogicFactory[In] {
import FlexiRoute._
def this(name: String) = this(Some(name))
def this() = this(None)
def this(name: String) = this(OperationAttributes.name(name))
def this() = this(OperationAttributes.none)
private var outputCount = 0
// hide the internal vertex things from subclass, and make it possible to create new instance
private class RouteVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex {
private class RouteVertex(override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount = 1
override def maximumInputCount = 1
override def minimumOutputCount = 2
override def maximumOutputCount = outputCount
override private[akka] val astNode = Ast.FlexiRouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]])
override def name = vertexName
override private[akka] val astNode = Ast.FlexiRouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]], flexiRoute and attributes)
final override private[scaladsl] def newInstance() = new RouteVertex(None)
final override private[scaladsl] def newInstance() = new RouteVertex(OperationAttributes.none)
}
private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(name)
private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(attributes)
/**
* Input port of the `FlexiRoute` junction. A [[Source]] can be connected to this output
@ -263,7 +264,7 @@ abstract class FlexiRoute[In](val name: Option[String]) extends RouteLogicFactor
*/
override def createRouteLogic(): RouteLogic[In]
override def toString = name match {
override def toString = attributes.nameLifted match {
case Some(n) n
case None getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode())
}

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl
import akka.stream.impl.Ast._
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.{ TimerTransformer, TransformerLike, OverflowStrategy }
import akka.util.Collections.EmptyImmutableSeq
import scala.collection.immutable
@ -70,6 +71,13 @@ trait Flow[-In, +Out] extends FlowOps[Out] {
* before this key.
*/
def withKey(key: Key): Flow[In, Out]
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[I <: In, O](attributes: OperationAttributes)(section: Flow[In, Out] Flow[I, O]): Flow[I, O] =
section(this.withAttributes(attributes)).withAttributes(OperationAttributes.none)
}
object Flow {
@ -122,7 +130,7 @@ trait RunnableFlow {
*/
trait FlowOps[+Out] {
import FlowOps._
type Repr[+O]
type Repr[+O] <: FlowOps[O]
/**
* Transform this stream by applying the given function to each of the elements
@ -199,10 +207,10 @@ trait FlowOps[+Out] {
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = {
def groupedWithin(n: Int, d: FiniteDuration): Repr[Out]#Repr[immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
timerTransform("groupedWithin", () new TimerTransformer[Out, immutable.Seq[Out]] {
withAttributes(name("groupedWithin")).timerTransform(() new TimerTransformer[Out, immutable.Seq[Out]] {
schedulePeriodically(GroupedWithinTimerKey, d)
var buf: Vector[Out] = Vector.empty
@ -235,8 +243,8 @@ trait FlowOps[+Out] {
/**
* Discard the elements received within the given duration at beginning of the stream.
*/
def dropWithin(d: FiniteDuration): Repr[Out] =
timerTransform("dropWithin", () new TimerTransformer[Out, Out] {
def dropWithin(d: FiniteDuration): Repr[Out]#Repr[Out] =
withAttributes(name("dropWithin")).timerTransform(() new TimerTransformer[Out, Out] {
scheduleOnce(DropWithinTimerKey, d)
var delegate: TransformerLike[Out, Out] =
@ -271,8 +279,8 @@ trait FlowOps[+Out] {
* Note that this can be combined with [[#take]] to limit the number of elements
* within the duration.
*/
def takeWithin(d: FiniteDuration): Repr[Out] =
timerTransform("takeWithin", () new TimerTransformer[Out, Out] {
def takeWithin(d: FiniteDuration): Repr[Out]#Repr[Out] =
withAttributes(name("takeWithin")).timerTransform(() new TimerTransformer[Out, Out] {
scheduleOnce(TakeWithinTimerKey, d)
var delegate: TransformerLike[Out, Out] = FlowOps.identityTransformer[Out]
@ -331,8 +339,8 @@ trait FlowOps[+Out] {
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation.
*/
def transform[T](name: String, mkStage: () Stage[Out, T]): Repr[T] =
andThen(StageFactory(mkStage, name))
def transform[T](mkStage: () Stage[Out, T]): Repr[T] =
andThen(StageFactory(mkStage))
/**
* Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element
@ -377,7 +385,7 @@ trait FlowOps[+Out] {
* This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]].
*/
def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U] = strategy match {
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll)
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll())
case _
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
}
@ -408,8 +416,11 @@ trait FlowOps[+Out] {
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
private[akka] def timerTransform[U](name: String, mkStage: () TimerTransformer[Out, U]): Repr[U] =
andThen(TimerTransform(mkStage.asInstanceOf[() TimerTransformer[Any, Any]], name))
private[akka] def timerTransform[U](mkStage: () TimerTransformer[Out, U]): Repr[U] =
andThen(TimerTransform(mkStage.asInstanceOf[() TimerTransformer[Any, Any]]))
/** INTERNAL API */
private[scaladsl] def withAttributes(attr: OperationAttributes): Repr[Out]
/** INTERNAL API */
// Storing ops in reverse order
@ -440,4 +451,3 @@ private[stream] object FlowOps {
override def onPush(elem: T, ctx: Context[T]): Directive = ctx.push(elem)
}
}

View file

@ -9,6 +9,8 @@ import akka.stream.FlowMaterializer
import akka.stream.impl.Ast
import akka.stream.impl.Ast.FanInAstNode
import akka.stream.impl.{ DirectedGraphBuilder, Edge }
import akka.stream.impl.Ast.Defaults._
import akka.stream.scaladsl.OperationAttributes._
import org.reactivestreams._
import scala.language.existentials
@ -59,11 +61,11 @@ private[akka] object Identity {
def getId: Int = id.getAndIncrement
}
private[akka] final class Identity[T]() extends FlowGraphInternal.InternalVertex with Junction[T] {
private[akka] final class Identity[T](override val attributes: OperationAttributes = OperationAttributes.none) extends FlowGraphInternal.InternalVertex with Junction[T] {
import Identity._
// This vertex can not have a name or else there can only be one instance in the whole graph
def name: Option[String] = None
override def name: Option[String] = None
override private[akka] val vertex = this
override val minimumInputCount: Int = 1
@ -71,9 +73,9 @@ private[akka] final class Identity[T]() extends FlowGraphInternal.InternalVertex
override val minimumOutputCount: Int = 1
override val maximumOutputCount: Int = 1
override private[akka] val astNode = Ast.IdentityAstNode(getId)
override private[akka] val astNode = Ast.IdentityAstNode(identityJunction and OperationAttributes.name(s"id$getId"))
final override private[scaladsl] def newInstance() = new Identity[T]()
final override private[scaladsl] def newInstance() = new Identity[T](attributes.withoutName)
}
object Merge {
@ -83,14 +85,16 @@ object Merge {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: Merge[T] = new Merge[T](None)
def apply[T]: Merge[T] = new Merge[T](OperationAttributes.none)
/**
* Create a named `Merge` vertex with the specified output type.
* Note that a `Merge` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): Merge[T] = new Merge[T](Some(name))
def apply[T](name: String): Merge[T] = new Merge[T](OperationAttributes.name(name))
def apply[T](attributes: OperationAttributes): Merge[T] = new Merge[T](attributes)
}
/**
@ -100,16 +104,16 @@ object Merge {
* When building the [[FlowGraph]] you must connect one or more input sources
* and one output sink to the `Merge` vertex.
*/
final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
final class Merge[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] {
override private[akka] val vertex = this
override val minimumInputCount: Int = 2
override val maximumInputCount: Int = Int.MaxValue
override val minimumOutputCount: Int = 1
override val maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Merge
override private[akka] def astNode = Ast.Merge(merge and attributes)
final override private[scaladsl] def newInstance() = new Merge[T](None)
final override private[scaladsl] def newInstance() = new Merge[T](attributes.withoutName)
}
object MergePreferred {
@ -124,14 +128,16 @@ object MergePreferred {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: MergePreferred[T] = new MergePreferred[T](None)
def apply[T]: MergePreferred[T] = new MergePreferred[T](OperationAttributes.none)
/**
* Create a named `MergePreferred` vertex with the specified output type.
* Note that a `MergePreferred` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): MergePreferred[T] = new MergePreferred[T](Some(name))
def apply[T](name: String): MergePreferred[T] = new MergePreferred[T](OperationAttributes.name(name))
def apply[T](attributes: OperationAttributes): MergePreferred[T] = new MergePreferred[T](attributes)
class Preferred[A] private[akka] (private[akka] val vertex: MergePreferred[A]) extends JunctionInPort[A] {
override private[akka] def port = PreferredPort
@ -146,7 +152,7 @@ object MergePreferred {
* When building the [[FlowGraph]] you must connect one or more input sources
* and one output sink to the `Merge` vertex.
*/
final class MergePreferred[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
final class MergePreferred[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] {
val preferred = new MergePreferred.Preferred(this)
@ -156,9 +162,9 @@ final class MergePreferred[T](override val name: Option[String]) extends FlowGra
override val minimumOutputCount: Int = 1
override val maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.MergePreferred
override private[akka] def astNode = Ast.MergePreferred(mergePreferred and attributes)
final override private[scaladsl] def newInstance() = new MergePreferred[T](None)
final override private[scaladsl] def newInstance() = new MergePreferred[T](attributes.withoutName)
}
object Broadcast {
@ -168,14 +174,16 @@ object Broadcast {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: Broadcast[T] = new Broadcast[T](None)
def apply[T]: Broadcast[T] = new Broadcast[T](OperationAttributes.none)
/**
* Create a named `Broadcast` vertex with the specified input type.
* Note that a `Broadcast` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): Broadcast[T] = new Broadcast[T](Some(name))
def apply[T](name: String): Broadcast[T] = new Broadcast[T](OperationAttributes.name(name))
def apply[T](attributes: OperationAttributes): Broadcast[T] = new Broadcast[T](attributes)
}
/**
@ -183,16 +191,16 @@ object Broadcast {
* the other streams. It will not shutdown until the subscriptions for at least
* two downstream subscribers have been established.
*/
final class Broadcast[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] {
final class Broadcast[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] {
override private[akka] def vertex = this
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 2
override def maximumOutputCount: Int = Int.MaxValue
override private[akka] def astNode = Ast.Broadcast
override private[akka] def astNode = Ast.Broadcast(broadcast and attributes)
final override private[scaladsl] def newInstance() = new Broadcast[T](None)
final override private[scaladsl] def newInstance() = new Broadcast[T](attributes.withoutName)
}
object Balance {
@ -202,7 +210,7 @@ object Balance {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: Balance[T] = new Balance[T](None, waitForAllDownstreams = false)
def apply[T]: Balance[T] = new Balance[T](waitForAllDownstreams = false, OperationAttributes.none)
/**
* Create a named `Balance` vertex with the specified input type.
* Note that a `Balance` with a specific name can only be used at one place (one vertex)
@ -212,7 +220,7 @@ object Balance {
* If you use `waitForAllDownstreams = true` it will not start emitting
* elements to downstream outputs until all of them have requested at least one element.
*/
def apply[T](name: String, waitForAllDownstreams: Boolean = false): Balance[T] = new Balance[T](Some(name), waitForAllDownstreams)
def apply[T](name: String, waitForAllDownstreams: Boolean = false): Balance[T] = new Balance[T](waitForAllDownstreams, OperationAttributes.name(name))
/**
* Create a new anonymous `Balance` vertex with the specified input type.
@ -223,7 +231,9 @@ object Balance {
* If you use `waitForAllDownstreams = true` it will not start emitting
* elements to downstream outputs until all of them have requested at least one element.
*/
def apply[T](waitForAllDownstreams: Boolean): Balance[T] = new Balance[T](None, waitForAllDownstreams)
def apply[T](waitForAllDownstreams: Boolean): Balance[T] = new Balance[T](waitForAllDownstreams, OperationAttributes.none)
def apply[T](waitForAllDownstreams: Boolean, attributes: OperationAttributes): Balance[T] = new Balance[T](waitForAllDownstreams, attributes)
}
/**
@ -231,16 +241,16 @@ object Balance {
* one of the other streams. It will not shutdown until the subscriptions for at least
* two downstream subscribers have been established.
*/
final class Balance[T](override val name: Option[String], val waitForAllDownstreams: Boolean) extends FlowGraphInternal.InternalVertex with Junction[T] {
final class Balance[T](val waitForAllDownstreams: Boolean, override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] {
override private[akka] def vertex = this
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
override def minimumOutputCount: Int = 2
override def maximumOutputCount: Int = Int.MaxValue
override private[akka] val astNode = Ast.Balance(waitForAllDownstreams)
override private[akka] val astNode = Ast.Balance(waitForAllDownstreams, balance and attributes)
final override private[scaladsl] def newInstance() = new Balance[T](None, waitForAllDownstreams)
final override private[scaladsl] def newInstance() = new Balance[T](waitForAllDownstreams, attributes.withoutName)
}
object Zip {
@ -250,7 +260,7 @@ object Zip {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[A, B]: Zip[A, B] = new Zip[A, B](None)
def apply[A, B]: Zip[A, B] = new Zip[A, B](OperationAttributes.none)
/**
* Create a named `Zip` vertex with the specified input types.
@ -258,7 +268,9 @@ object Zip {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](Some(name))
def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](OperationAttributes.name(name))
def apply[A, B](attr: OperationAttributes): Zip[A, B] = new Zip[A, B](attr)
class Left[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[A] {
override private[akka] def port = 0
@ -278,7 +290,7 @@ object Zip {
* by combining corresponding elements in pairs. If one of the two streams is
* longer than the other, its remaining elements are ignored.
*/
private[akka] class Zip[A, B](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
private[akka] class Zip[A, B](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex {
import akka.stream.impl.Zip.AsScalaTuple2
val left = new Zip.Left(this)
@ -290,9 +302,9 @@ private[akka] class Zip[A, B](override val name: Option[String]) extends FlowGra
override def minimumOutputCount: Int = 1
override def maximumOutputCount: Int = 1
override private[akka] def astNode: FanInAstNode = Ast.Zip(AsScalaTuple2)
override private[akka] def astNode: FanInAstNode = Ast.Zip(AsScalaTuple2, zip and attributes)
final override private[scaladsl] def newInstance() = new Zip[A, B](name = None)
final override private[scaladsl] def newInstance() = new Zip[A, B](attributes.withoutName)
}
object Unzip {
@ -302,7 +314,7 @@ object Unzip {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[A, B]: Unzip[A, B] = new Unzip[A, B](None)
def apply[A, B]: Unzip[A, B] = new Unzip[A, B](OperationAttributes.none)
/**
* Create a named `Unzip` vertex with the specified output types.
@ -310,7 +322,9 @@ object Unzip {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[A, B](name: String): Unzip[A, B] = new Unzip[A, B](Some(name))
def apply[A, B](name: String): Unzip[A, B] = new Unzip[A, B](OperationAttributes.name(name))
def apply[A, B](attributes: OperationAttributes): Unzip[A, B] = new Unzip[A, B](attributes)
class In[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionInPort[(A, B)] {
override type NextT = Nothing
@ -328,7 +342,7 @@ object Unzip {
/**
* Takes a stream of pair elements and splits each pair to two output streams.
*/
final class Unzip[A, B](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class Unzip[A, B](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex {
val in = new Unzip.In(this)
val left = new Unzip.Left(this)
val right = new Unzip.Right(this)
@ -338,9 +352,9 @@ final class Unzip[A, B](override val name: Option[String]) extends FlowGraphInte
override def minimumOutputCount: Int = 2
override def maximumOutputCount: Int = 2
override private[akka] def astNode = Ast.Unzip
override private[akka] def astNode = Ast.Unzip(unzip and attributes)
final override private[scaladsl] def newInstance() = new Unzip[A, B](name = None)
final override private[scaladsl] def newInstance() = new Unzip[A, B](attributes.withoutName)
}
object Concat {
@ -350,7 +364,7 @@ object Concat {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[T]: Concat[T] = new Concat[T](None)
def apply[T]: Concat[T] = new Concat[T](OperationAttributes.none)
/**
* Create a named `Concat` vertex with the specified input types.
@ -358,7 +372,9 @@ object Concat {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.*
*/
def apply[T](name: String): Concat[T] = new Concat[T](Some(name))
def apply[T](name: String): Concat[T] = new Concat[T](OperationAttributes.name(name))
def apply[T](attributes: OperationAttributes): Concat[T] = new Concat[T](attributes)
class First[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] {
override val port = 0
@ -378,7 +394,7 @@ object Concat {
* by consuming one stream first emitting all of its elements, then consuming the
* second stream emitting all of its elements.
*/
final class Concat[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class Concat[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex {
val first = new Concat.First(this)
val second = new Concat.Second(this)
val out = new Concat.Out(this)
@ -388,9 +404,9 @@ final class Concat[T](override val name: Option[String]) extends FlowGraphIntern
override def minimumOutputCount: Int = 1
override def maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.Concat
override private[akka] def astNode = Ast.Concat(concat and attributes)
final override private[scaladsl] def newInstance() = new Concat[T](name = None)
final override private[scaladsl] def newInstance() = new Concat[T](attributes.withoutName)
}
object UndefinedSink {
@ -400,21 +416,21 @@ object UndefinedSink {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: UndefinedSink[T] = new UndefinedSink[T](None)
def apply[T]: UndefinedSink[T] = new UndefinedSink[T](OperationAttributes.none)
/**
* Create a named `UndefinedSink` vertex with the specified input type.
* Note that a `UndefinedSink` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](Some(name))
def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](OperationAttributes.name(name))
}
/**
* It is possible to define a [[PartialFlowGraph]] with output pipes that are not connected
* yet by using this placeholder instead of the real [[Sink]]. Later the placeholder can
* be replaced with [[FlowGraphBuilder#attachSink]].
*/
final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class UndefinedSink[-T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
@ -423,7 +439,7 @@ final class UndefinedSink[-T](override val name: Option[String]) extends FlowGra
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sinks cannot be materialized")
final override private[scaladsl] def newInstance() = new UndefinedSink[T](name = None)
final override private[scaladsl] def newInstance() = new UndefinedSink[T](attributes.withoutName)
}
object UndefinedSource {
@ -433,21 +449,21 @@ object UndefinedSource {
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[T]: UndefinedSource[T] = new UndefinedSource[T](None)
def apply[T]: UndefinedSource[T] = new UndefinedSource[T](OperationAttributes.none)
/**
* Create a named `UndefinedSource` vertex with the specified output type.
* Note that a `UndefinedSource` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. Calling this method several times with the same name
* returns instances that are `equal`.
*/
def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](Some(name))
def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](OperationAttributes.name(name))
}
/**
* It is possible to define a [[PartialFlowGraph]] with input pipes that are not connected
* yet by using this placeholder instead of the real [[Source]]. Later the placeholder can
* be replaced with [[FlowGraphBuilder#attachSource]].
*/
final class UndefinedSource[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex {
final class UndefinedSource[+T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount: Int = 0
override def maximumInputCount: Int = 0
override def minimumOutputCount: Int = 1
@ -455,7 +471,7 @@ final class UndefinedSource[+T](override val name: Option[String]) extends FlowG
override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sources cannot be materialized")
final override private[scaladsl] def newInstance() = new UndefinedSource[T](name = None)
final override private[scaladsl] def newInstance() = new UndefinedSource[T](attributes.withoutName)
}
/**
@ -518,7 +534,8 @@ private[akka] object FlowGraphInternal {
}
trait InternalVertex extends Vertex {
def name: Option[String]
def attributes: OperationAttributes
def name: Option[String] = attributes.nameLifted
def minimumInputCount: Int
def maximumInputCount: Int
@ -1138,7 +1155,7 @@ class FlowGraphBuilder private[akka] (
node.outDegree <= v.maximumOutputCount,
s"$v must have at most ${v.maximumOutputCount} outgoing edges")
v.astNode match {
case Ast.MergePreferred
case Ast.MergePreferred(_)
require(
node.incoming.count(_.label.inputPort == MergePreferred.PreferredPort) <= 1,
s"$v must have at most one preferred edge")

View file

@ -119,6 +119,8 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](
override def withKey(key: Key): Flow[In, Out] = this.copy(outPipe = outPipe.withKey(key))
override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))
def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr))
}
private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] {
@ -164,6 +166,8 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou
override def withKey(key: Key): Source[Out] = this.copy(outPipe = outPipe.withKey(key))
override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op))
def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr))
}
private[scaladsl] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] {

View file

@ -0,0 +1,90 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.MaterializerSettings
import akka.stream.impl.Ast.AstNode
/**
* Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]]
* materialization.
*/
case class OperationAttributes private (private val attributes: List[OperationAttributes.Attribute] = Nil) {
import OperationAttributes._
/**
* Adds given attributes to the end of these attributes.
*/
def and(other: OperationAttributes): OperationAttributes = {
OperationAttributes(attributes ::: other.attributes)
}
private[akka] def nameLifted: Option[String] =
attributes.collect {
case Name(name) name
}.reduceOption(_ + "-" + _)
private[akka] def name: String = nameLifted match {
case Some(name) name
case _ "unknown-operation"
}
private[akka] def settings: MaterializerSettings MaterializerSettings =
attributes.collect {
case InputBuffer(initial, max) (s: MaterializerSettings) s.withInputBuffer(initial, max)
case FanOutBuffer(initial, max) (s: MaterializerSettings) s.withFanOutBuffer(initial, max)
case Dispatcher(dispatcher) (s: MaterializerSettings) s.withDispatcher(dispatcher)
}.reduceOption(_ andThen _).getOrElse(identity)
private[akka] def transform(node: AstNode): AstNode =
if ((this eq OperationAttributes.none) || (this eq node.attributes)) node
else node.withAttributes(attributes = this and node.attributes)
/**
* Filtering out name attributes is needed for Vertex.newInstance().
* However there is an ongoing discussion for removing this feature,
* after which this will not be needed anymore.
*
* https://github.com/akka/akka/issues/16392
*/
private[akka] def withoutName = this.copy(
attributes = attributes.filterNot {
case attr: Name true
})
}
object OperationAttributes {
private[OperationAttributes] trait Attribute
private[OperationAttributes] case class Name(n: String) extends Attribute
private[OperationAttributes] case class InputBuffer(initial: Int, max: Int) extends Attribute
private[OperationAttributes] case class FanOutBuffer(initial: Int, max: Int) extends Attribute
private[OperationAttributes] case class Dispatcher(dispatcher: String) extends Attribute
private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes =
apply(List(attribute))
private[akka] val none: OperationAttributes = OperationAttributes()
/**
* Specifies the name of the operation.
*/
def name(name: String): OperationAttributes = OperationAttributes(Name(name))
/**
* Specifies the initial and maximum size of the input buffer.
*/
def inputBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(InputBuffer(initial, max))
/**
* Specifies the initial and maximum size of the fan out buffer.
*/
def fanOutBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(FanOutBuffer(initial, max))
/**
* Specifies the name of the dispatcher.
*/
def dispatcher(dispatcher: String): OperationAttributes = OperationAttributes(Dispatcher(dispatcher))
}

View file

@ -33,10 +33,12 @@ private[akka] object Pipe {
/**
* Flow with one open input and one open output.
*/
private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key]) extends Flow[In, Out] {
private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Flow[In, Out] {
override type Repr[+O] = Pipe[In @uncheckedVariance, O]
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = Pipe(ops = op :: ops, keys) // FIXME raw addition of AstNodes
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = Pipe(ops = attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes
def withAttributes(attr: OperationAttributes): Repr[Out] = this.copy(attributes = attr)
private[stream] def withSink(out: Sink[Out]): SinkPipe[In] = SinkPipe(out, ops, keys)
@ -79,10 +81,12 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod
/**
* Pipe with open output and attached input. Can be used as a `Publisher`.
*/
private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key]) extends Source[Out] {
private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Source[Out] {
override type Repr[+O] = SourcePipe[O]
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops, keys) // FIXME raw addition of AstNodes
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes
def withAttributes(attr: OperationAttributes): Repr[Out] = this.copy(attributes = attr)
private[stream] def withSink(out: Sink[Out]): RunnablePipe = RunnablePipe(input, out, ops, keys)

View file

@ -79,6 +79,13 @@ trait Source[+Out] extends FlowOps[Out] {
* before this key. This also includes the keyed source if applicable.
*/
def withKey(key: Key): Source[Out]
/**
* Applies given [[OperationAttributes]] to a given section.
*/
def section[T](attributes: OperationAttributes)(section: Source[Out] Source[T]): Source[T] =
section(this.withAttributes(attributes)).withAttributes(OperationAttributes.none)
}
object Source {