diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index eaa057ebb3..71e5984464 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -22,7 +22,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( - Ast.OpFactory(() ⇒ akka.stream.impl.fusing.Map[Int, Int](identity), "identity"), flowName, 1) + Ast.Fusable(Vector(akka.stream.impl.fusing.Map[Int, Int](identity)), "identity"), flowName, 1) processor.asInstanceOf[Processor[Int, Int]] } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 5c05fef2b6..78c9ccc403 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -14,7 +14,7 @@ import akka.actor.{ Props, ActorRefFactory, ActorRef } import akka.stream.{ TransformerLike, MaterializerSettings } import akka.stream.FlowMaterializer import akka.stream.impl.{ ActorProcessorFactory, TransformProcessorImpl, StreamSupervisor, ActorBasedFlowMaterializer } -import akka.stream.impl.Ast.{ Transform, OpFactory, AstNode } +import akka.stream.impl.Ast.{ Transform, Fusable, AstNode } import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import akka.stream.testkit.ChainSetup import akka.testkit._ @@ -72,9 +72,9 @@ object FlowSpec { override def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val props = op match { - case t: Transform ⇒ Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage)) - case OpFactory(mkOps, _) ⇒ Props(new BrokenActorInterpreter(settings, mkOps.map(_.apply()), brokenMessage)).withDispatcher(settings.dispatcher) - case o ⇒ ActorProcessorFactory.props(this, o) + case t: Transform ⇒ Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage)) + case f: Fusable ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher) + case o ⇒ ActorProcessorFactory.props(this, o) } val impl = actorOf(props, s"$flowName-$n-${op.name}") ActorProcessorFactory(impl) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index cff72c9c93..4247c5525c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -43,12 +43,7 @@ private[akka] object Ast { case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode - object OpFactory { - def apply(mkOp: () ⇒ Op[_, _, _, _, _], name: String): OpFactory = - OpFactory(List(mkOp), name) - } - - case class OpFactory(mkOps: List[() ⇒ Op[_, _, _, _, _]], name: String) extends AstNode + case class Fusable(ops: immutable.Seq[Op[_, _, _, _, _]], name: String) extends AstNode case class MapAsync(f: Any ⇒ Future[Any]) extends AstNode { override def name = "mapAsync" @@ -331,7 +326,7 @@ private[akka] object ActorProcessorFactory { def props(materializer: FlowMaterializer, op: AstNode): Props = { val settings = materializer.settings (op match { - case OpFactory(mkOps, _) ⇒ Props(new ActorInterpreter(materializer.settings, mkOps.map(_.apply()))) + case Fusable(ops, _) ⇒ Props(new ActorInterpreter(materializer.settings, ops)) case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) case m: MapAsync ⇒ Props(new MapAsyncProcessorImpl(settings, m.f)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b3a8ce342b..8bf3810a9b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -108,13 +108,13 @@ trait FlowOps[+Out] { * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. */ - def map[T](f: Out ⇒ T): Repr[T] = andThen(OpFactory(() ⇒ fusing.Map(f), "map")) + def map[T](f: Out ⇒ T): Repr[T] = andThen(Fusable(Vector(fusing.Map(f)), "map")) /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. */ - def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = andThen(OpFactory(() ⇒ fusing.MapConcat(f), "mapConcat")) + def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = andThen(Fusable(Vector(fusing.MapConcat(f)), "mapConcat")) /** * Transform this stream by applying the given function to each of the elements @@ -144,16 +144,16 @@ trait FlowOps[+Out] { /** * Only pass on those elements that satisfy the given predicate. */ - def filter(p: Out ⇒ Boolean): Repr[Out] = andThen(OpFactory(() ⇒ fusing.Filter(p), "filter")) + def filter(p: Out ⇒ Boolean): Repr[Out] = andThen(Fusable(Vector(fusing.Filter(p)), "filter")) /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. * Non-matching elements are filtered out. */ - def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(OpFactory(List( - () ⇒ fusing.Filter(pf.isDefinedAt), - () ⇒ fusing.Map(pf.apply)), "filter")) + def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(Fusable(Vector( + fusing.Filter(pf.isDefinedAt), + fusing.Map(pf.apply)), "filter")) /** * Chunk up this stream into groups of the given size, with the last group @@ -163,7 +163,7 @@ trait FlowOps[+Out] { */ def grouped(n: Int): Repr[immutable.Seq[Out]] = { require(n > 0, "n must be greater than 0") - andThen(OpFactory(() ⇒ fusing.Grouped(n), "grouped")) + andThen(Fusable(Vector(fusing.Grouped(n)), "grouped")) } /** @@ -208,8 +208,8 @@ trait FlowOps[+Out] { * No elements will be dropped if `n` is zero or negative. */ def drop(n: Int): Repr[Out] = - if (n <= 0) andThen(OpFactory(Nil, "drop")) - else andThen(OpFactory(() ⇒ fusing.Drop(n), "drop")) + if (n <= 0) andThen(Fusable(Vector.empty, "drop")) + else andThen(Fusable(Vector(fusing.Drop(n)), "drop")) /** * Discard the elements received within the given duration at beginning of the stream. @@ -240,8 +240,8 @@ trait FlowOps[+Out] { * or negative. */ def take(n: Int): Repr[Out] = - if (n <= 0) andThen(OpFactory(() ⇒ fusing.Completed(), "take")) - else andThen(OpFactory(() ⇒ fusing.Take(n), "take")) + if (n <= 0) andThen(Fusable(Vector(fusing.Completed()), "take")) + else andThen(Fusable(Vector(fusing.Take(n)), "take")) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -278,7 +278,7 @@ trait FlowOps[+Out] { * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = - andThen(OpFactory(() ⇒ fusing.Conflate(seed, aggregate), "conflate")) + andThen(Fusable(Vector(fusing.Conflate(seed, aggregate)), "conflate")) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older @@ -294,7 +294,7 @@ trait FlowOps[+Out] { * state. */ def expand[S, U](seed: Out ⇒ S)(extrapolate: S ⇒ (U, S)): Repr[U] = - andThen(OpFactory(() ⇒ fusing.Expand(seed, extrapolate), "expand")) + andThen(Fusable(Vector(fusing.Expand(seed, extrapolate)), "expand")) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. @@ -306,7 +306,7 @@ trait FlowOps[+Out] { */ def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = { require(size > 0, s"Buffer size must be larger than zero but was [$size]") - andThen(OpFactory(() ⇒ fusing.Buffer(size, overflowStrategy), "buffer")) + andThen(Fusable(Vector(fusing.Buffer(size, overflowStrategy)), "buffer")) } /**