diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala index c73b36e5f4..d07014f28f 100644 --- a/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala @@ -44,14 +44,14 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin Duct[(HttpRequest, Any)] .broadcast(contextBypassSubscriber) .map(requestMethodByPass) - .transform(responseRendererFactory.newRenderer) + .transform("renderer", () ⇒ responseRendererFactory.newRenderer) .flatten(FlattenStrategy.concat) - .transform(errorLogger(log, "Outgoing request stream error")) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) .produceTo(tcpConn.outputStream)(materializer) val responsePublisher = Flow(tcpConn.inputStream) - .transform(rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass)) + .transform("rootParser", () ⇒ rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass)) .splitWhen(_.isInstanceOf[MessageStart]) .headAndTail(materializer) .collect { diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 999bdcec63..958655785b 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -43,7 +43,7 @@ sealed trait HttpEntity extends japi.HttpEntity { */ def toStrict(timeout: FiniteDuration, materializer: FlowMaterializer)(implicit ec: ExecutionContext): Future[HttpEntity.Strict] = Flow(dataBytes(materializer)) - .transform(new TimerTransformer[ByteString, HttpEntity.Strict] { + .timerTransform("toStrict", () ⇒ new TimerTransformer[ByteString, HttpEntity.Strict] { var bytes = ByteString.newBuilder scheduleOnce("", timeout) diff --git a/akka-http-core/src/main/scala/akka/http/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/rendering/HttpRequestRendererFactory.scala index 95375b42ba..82ff95ac76 100644 --- a/akka-http-core/src/main/scala/akka/http/rendering/HttpRequestRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/rendering/HttpRequestRendererFactory.scala @@ -106,12 +106,12 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.` case HttpEntity.Default(_, contentLength, data) ⇒ renderContentLength(contentLength) renderByteStrings(r, - Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer), + Flow(data).transform("checkContentLenght", () ⇒ new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer), materializer) case HttpEntity.Chunked(_, chunks) ⇒ r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf ~~ CrLf - renderByteStrings(r, Flow(chunks).transform(new ChunkTransformer).toPublisher()(materializer), materializer) + renderByteStrings(r, Flow(chunks).transform("chunkTransform", () ⇒ new ChunkTransformer).toPublisher()(materializer), materializer) } renderRequestLine() diff --git a/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala index c5bf94508a..6a3cbf425c 100644 --- a/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala @@ -132,7 +132,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser renderHeaders(headers.toList) renderEntityContentType(r, entity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf ~~ CrLf - byteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer)) + byteStrings(Flow(data).transform("checkContentLenght", () ⇒ new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer)) case HttpEntity.CloseDelimited(_, data) ⇒ renderHeaders(headers.toList, alwaysClose = true) @@ -149,7 +149,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser if (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf r ~~ CrLf - byteStrings(Flow(chunks).transform(new ChunkTransformer).toPublisher()(materializer)) + byteStrings(Flow(chunks).transform("checkContentLenght", () ⇒ new ChunkTransformer).toPublisher()(materializer)) } } diff --git a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala index 6513f45ef9..4367258372 100644 --- a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala @@ -40,7 +40,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings, val requestPublisher = Flow(tcpConn.inputStream) - .transform(rootParser.copyWith(warnOnIllegalHeader)) + .transform("rootParser", () ⇒ rootParser.copyWith(warnOnIllegalHeader)) // this will create extra single element `[MessageEnd]` substreams, that will // be filtered out by the above `collect` for the applicationBypass and the // below `collect` for the actual request handling @@ -58,10 +58,10 @@ private[http] class HttpServerPipeline(settings: ServerSettings, val responseSubscriber = Duct[HttpResponse] .merge(applicationBypassPublisher) - .transform(applyApplicationBypass) - .transform(responseRendererFactory.newRenderer) + .transform("applyApplicationBypass", () ⇒ applyApplicationBypass) + .transform("renderer", () ⇒ responseRendererFactory.newRenderer) .flatten(FlattenStrategy.concat) - .transform(errorLogger(log, "Outgoing response stream error")) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) .produceTo(tcpConn.outputStream)(materializer) Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber) diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index 1cc467e08b..5085221164 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -37,8 +37,8 @@ package object util { private[http] implicit class FlowWithPrintEvent[T](val underlying: Flow[T]) { def printEvent(marker: String): Flow[T] = - underlying.transform { - new Transformer[T, T] { + underlying.transform("transform", + () ⇒ new Transformer[T, T] { def onNext(element: T) = { println(s"$marker: $element") element :: Nil @@ -47,8 +47,7 @@ package object util { println(s"$marker: Terminated with error $e") Nil } - } - } + }) } private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = diff --git a/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala index 82ee4064ca..215ad311a8 100644 --- a/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala @@ -354,7 +354,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val future = Flow(input.toList) .map(ByteString.apply) - .transform(parser) + .transform("parser", () ⇒ parser) .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) .headAndTail(materializer) .collect { diff --git a/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala index dca2c7458d..5e62951dd5 100644 --- a/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala @@ -213,7 +213,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val future = Flow(input.toList) .map(ByteString.apply) - .transform(newParser(requestMethod)) + .transform("parser", () ⇒ newParser(requestMethod)) .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) .headAndTail(materializer) .collect { diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 2e2df5f56b..1930f6a81b 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -39,7 +39,7 @@ trait MultipartMarshallers { Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset) ⇒ val log = actorSystem(refFactory).log val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, fm, log) - val chunks = Flow(value.parts).transform(bodyPartRenderer).flatten(FlattenStrategy.concat).toPublisher()(fm) + val chunks = Flow(value.parts).transform("bodyPartRenderer", () ⇒ bodyPartRenderer).flatten(FlattenStrategy.concat).toPublisher()(fm) HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks) } } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index bc608b64ed..756f2e2bb8 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -40,7 +40,7 @@ trait MultipartUnmarshallers { case None ⇒ sys.error("Content-Type with a multipart media type must have a 'boundary' parameter") case Some(boundary) ⇒ val bodyParts = Flow(entity.dataBytes(fm)) - .transform(new BodyPartParser(defaultContentType, boundary, fm, actorSystem(refFactory).log)) + .transform("bodyPart", () ⇒ new BodyPartParser(defaultContentType, boundary, fm, actorSystem(refFactory).log)) .splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart]) .headAndTail(fm) .collect { diff --git a/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala b/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala index 8dc6d75232..476b5a9978 100644 --- a/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala +++ b/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala @@ -3,16 +3,15 @@ */ package akka.stream -import scala.collection.immutable -import scala.collection.mutable +import akka.actor.{ ActorContext, Cancellable } + +import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration -import akka.actor.ActorContext -import akka.actor.Cancellable /** * [[Transformer]] with support for scheduling keyed (named) timer events. */ -abstract class TimerTransformer[-T, +U] extends Transformer[T, U] { +abstract class TimerTransformer[-T, +U] extends TransformerLike[T, U] { import TimerTransformer._ private val timers = mutable.Map[Any, Timer]() private val timerIdGen = Iterator from 1 diff --git a/akka-stream/src/main/scala/akka/stream/Transformer.scala b/akka-stream/src/main/scala/akka/stream/Transformer.scala index 13e26b041c..5c2b898e26 100644 --- a/akka-stream/src/main/scala/akka/stream/Transformer.scala +++ b/akka-stream/src/main/scala/akka/stream/Transformer.scala @@ -5,18 +5,7 @@ package akka.stream import scala.collection.immutable -/** - * General interface for stream transformation. - * - * It is possible to keep state in the concrete [[Transformer]] instance with - * ordinary instance variables. The [[Transformer]] is executed by an actor and - * therefore you don not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * @see [[akka.stream.scaladsl.Flow#transform]] - * @see [[akka.stream.javadsl.Flow#transform]] - */ -abstract class Transformer[-T, +U] { +abstract class TransformerLike[-T, +U] { /** * Invoked for each element to produce a (possibly empty) sequence of * output elements. @@ -55,9 +44,17 @@ abstract class Transformer[-T, +U] { */ def cleanup(): Unit = () - /** - * Name of this transformation step. Used as part of the actor name. - * Facilitates debugging and logging. - */ - def name: String = "transform" } + +/** + * General interface for stream transformation. + * + * It is possible to keep state in the concrete [[Transformer]] instance with + * ordinary instance variables. The [[Transformer]] is executed by an actor and + * therefore you don not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * @see [[akka.stream.scaladsl.Flow#transform]] + * @see [[akka.stream.javadsl.Flow#transform]] + */ +abstract class Transformer[-T, +U] extends TransformerLike[T, U] diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala index bb6ca9343e..1995b2d5d0 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala @@ -27,9 +27,9 @@ private[akka] trait TimedOps { def timed[I, O](flow: Flow[I], measuredOps: Flow[I] ⇒ Flow[O], onComplete: FiniteDuration ⇒ Unit): Flow[O] = { val ctx = new TimedFlowContext - val startWithTime = flow.transform(new StartTimedFlow(ctx)) + val startWithTime = flow.transform("startTimed", () ⇒ new StartTimedFlow(ctx)) val userFlow = measuredOps(startWithTime) - userFlow.transform(new StopTimed(ctx, onComplete)) + userFlow.transform("stopTimed", () ⇒ new StopTimed(ctx, onComplete)) } /** @@ -41,9 +41,9 @@ private[akka] trait TimedOps { // todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type) val ctx = new TimedFlowContext - val startWithTime: Duct[I, O] = duct.transform(new StartTimedFlow(ctx)) + val startWithTime: Duct[I, O] = duct.transform("startTimed", () ⇒ new StartTimedFlow(ctx)) val userFlow: Duct[O, Out] = measuredOps(startWithTime) - userFlow.transform(new StopTimed(ctx, onComplete)) + userFlow.transform("stopTimed", () ⇒ new StopTimed(ctx, onComplete)) } } @@ -61,7 +61,7 @@ private[akka] trait TimedIntervalBetweenOps { * Measures rolling interval between immediatly subsequent `matching(o: O)` elements. */ def timedIntervalBetween[O](flow: Flow[O], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Flow[O] = { - flow.transform(new TimedIntervalTransformer[O](matching, onInterval)) + flow.transform("timedInterval", () ⇒ new TimedIntervalTransformer[O](matching, onInterval)) } /** @@ -69,7 +69,7 @@ private[akka] trait TimedIntervalBetweenOps { */ def timedIntervalBetween[I, O](duct: Duct[I, O], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Duct[I, O] = { // todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type) - duct.transform(new TimedIntervalTransformer[O](matching, onInterval)) + duct.transform("timedInterval", () ⇒ new TimedIntervalTransformer[O](matching, onInterval)) } } @@ -100,8 +100,6 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { } final class StartTimedFlow[T](ctx: TimedFlowContext) extends Transformer[T, T] { - override def name = "startTimed" - private var started = false override def onNext(element: T) = { @@ -115,7 +113,6 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { } final class StopTimed[T](ctx: TimedFlowContext, _onComplete: FiniteDuration ⇒ Unit) extends Transformer[T, T] { - override def name = "stopTimed" override def cleanup() { val d = ctx.stop() @@ -126,8 +123,6 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { } final class TimedIntervalTransformer[T](matching: T ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit) extends Transformer[T, T] { - override def name = "timedInterval" - private var prevNanos = 0L private var matched = 0L diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 589ad13720..50437f10f8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -3,34 +3,18 @@ */ package akka.stream.impl +import java.util.concurrent.atomic.AtomicLong + +import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef } +import akka.pattern.ask +import akka.stream._ +import org.reactivestreams.{ Processor, Publisher, Subscriber } + import scala.annotation.tailrec import scala.collection.immutable -import org.reactivestreams.{ Publisher, Subscriber, Processor } -import akka.actor.ActorRefFactory -import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer, Transformer } -import scala.util.Try -import scala.concurrent.Future -import scala.util.Success -import scala.util.Failure -import java.util.concurrent.atomic.AtomicLong -import akka.actor.ActorContext -import akka.actor.ExtensionIdProvider -import akka.actor.ExtensionId -import akka.actor.ExtendedActorSystem -import akka.actor.ActorSystem -import akka.actor.Extension -import scala.concurrent.duration.FiniteDuration -import akka.stream.TimerTransformer -import akka.actor.Props -import akka.actor.Actor -import akka.actor.ActorRef -import akka.pattern.ask -import akka.util.Timeout +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import scala.concurrent.Await -import akka.actor.LocalActorRef -import akka.actor.RepointableActorRef -import akka.actor.ActorCell +import scala.util.{ Failure, Success } /** * INTERNAL API @@ -40,9 +24,8 @@ private[akka] object Ast { def name: String } - case class Transform(transformer: Transformer[Any, Any]) extends AstNode { - override def name = transformer.name - } + case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode + case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode case class MapFuture(f: Any ⇒ Future[Any]) extends AstNode { override def name = "mapFuture" } @@ -137,8 +120,7 @@ private[akka] case class ActorBasedFlowMaterializer( flowNameCounter: AtomicLong, namePrefix: String) extends FlowMaterializer(settings) { - import Ast._ - import ActorBasedFlowMaterializer._ + import akka.stream.impl.Ast._ def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) @@ -170,7 +152,7 @@ private[akka] case class ActorBasedFlowMaterializer( } } - private val identityTransform = Transform( + private val identityTransform = Transform("identity", () ⇒ new Transformer[Any, Any] { override def onNext(element: Any) = List(element) }) @@ -178,7 +160,6 @@ private[akka] case class ActorBasedFlowMaterializer( def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val impl = actorOf(ActorProcessor.props(settings, op), s"$flowName-$n-${op.name}") ActorProcessor(impl) - } def actorOf(props: Props, name: String): ActorRef = supervisor match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index b3c97c123f..6a890ed28e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -19,9 +19,8 @@ private[akka] object ActorProcessor { import Ast._ def props(settings: MaterializerSettings, op: AstNode): Props = (op match { - case Transform(transformer: TimerTransformer[_, _]) ⇒ - Props(new TimerTransformerProcessorsImpl(settings, transformer)) - case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) + case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) + case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index f2023c5a37..0fa05ec717 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -37,7 +37,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops override def toFuture()(implicit materializer: FlowMaterializer): Future[O] = { val p = Promise[O]() - transform(new Transformer[O, Unit] { + transform("toFuture", () ⇒ new Transformer[O, Unit] { var done = false override def onNext(in: O) = { p success in; done = true; Nil } override def onError(e: Throwable) = { p failure e } @@ -51,7 +51,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) override def onComplete(callback: Try[Unit] ⇒ Unit)(implicit materializer: FlowMaterializer): Unit = - transform(new Transformer[O, Unit] { + transform("onComplete", () ⇒ new Transformer[O, Unit] { override def onNext(in: O) = Nil override def onError(e: Throwable) = { callback(Failure(e)) @@ -95,7 +95,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) override def onComplete(callback: Try[Unit] ⇒ Unit)(implicit materializer: FlowMaterializer): Subscriber[In] = - transform(new Transformer[Out, Unit] { + transform("onComplete", () ⇒ new Transformer[Out, Unit] { override def onNext(in: Out) = Nil override def onError(e: Throwable) = { callback(Failure(e)) @@ -155,45 +155,41 @@ private[akka] trait Builder[Out] { protected def andThen[U](op: Ast.AstNode): Thing[U] def map[U](f: Out ⇒ U): Thing[U] = - transform(new Transformer[Out, U] { + transform("map", () ⇒ new Transformer[Out, U] { override def onNext(in: Out) = List(f(in)) - override def name = "map" }) def mapFuture[U](f: Out ⇒ Future[U]): Thing[U] = andThen(MapFuture(f.asInstanceOf[Any ⇒ Future[Any]])) def filter(p: Out ⇒ Boolean): Thing[Out] = - transform(new Transformer[Out, Out] { + transform("filter", () ⇒ new Transformer[Out, Out] { override def onNext(in: Out) = if (p(in)) List(in) else Nil - override def name = "filter" }) def collect[U](pf: PartialFunction[Out, U]): Thing[U] = - transform(new Transformer[Out, U] { + transform("collect", () ⇒ new Transformer[Out, U] { override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil }) def foreachTransform(c: Out ⇒ Unit): Thing[Unit] = - transform(new Transformer[Out, Unit] { + transform("foreach", () ⇒ new Transformer[Out, Unit] { override def onNext(in: Out) = { c(in); Nil } override def onTermination(e: Option[Throwable]) = ListOfUnit - override def name = "foreach" }) def fold[U](zero: U)(f: (U, Out) ⇒ U): Thing[U] = - transform(new FoldTransformer[U](zero, f)) + transform("fold", () ⇒ new FoldTransformer[U](zero, f)) // Without this class compiler complains about // "Parameter type in structural refinement may not refer to an abstract type defined outside that refinement" class FoldTransformer[S](var state: S, f: (S, Out) ⇒ S) extends Transformer[Out, S] { override def onNext(in: Out): immutable.Seq[S] = { state = f(state, in); Nil } override def onTermination(e: Option[Throwable]): immutable.Seq[S] = List(state) - override def name = "fold" } def drop(n: Int): Thing[Out] = - transform(new Transformer[Out, Out] { + transform("drop", () ⇒ new Transformer[Out, Out] { var delegate: Transformer[Out, Out] = if (n == 0) identityTransformer.asInstanceOf[Transformer[Out, Out]] else new Transformer[Out, Out] { @@ -207,11 +203,10 @@ private[akka] trait Builder[Out] { } override def onNext(in: Out) = delegate.onNext(in) - override def name = "drop" }) def dropWithin(d: FiniteDuration): Thing[Out] = - transform(new TimerTransformer[Out, Out] { + timerTransform("dropWithin", () ⇒ new TimerTransformer[Out, Out] { scheduleOnce(DropWithinTimerKey, d) var delegate: Transformer[Out, Out] = @@ -224,11 +219,10 @@ private[akka] trait Builder[Out] { delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] Nil } - override def name = "dropWithin" }) def take(n: Int): Thing[Out] = - transform(new Transformer[Out, Out] { + transform("take", () ⇒ new Transformer[Out, Out] { var delegate: Transformer[Out, Out] = if (n == 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] else new Transformer[Out, Out] { @@ -243,11 +237,10 @@ private[akka] trait Builder[Out] { override def onNext(in: Out) = delegate.onNext(in) override def isComplete = delegate.isComplete - override def name = "take" }) def takeWithin(d: FiniteDuration): Thing[Out] = - transform(new TimerTransformer[Out, Out] { + timerTransform("takeWithin", () ⇒ new TimerTransformer[Out, Out] { scheduleOnce(TakeWithinTimerKey, d) var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]] @@ -258,13 +251,12 @@ private[akka] trait Builder[Out] { delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] Nil } - override def name = "takeWithin" }) def prefixAndTail[U >: Out](n: Int): Thing[(immutable.Seq[Out], Publisher[U])] = andThen(PrefixAndTail(n)) def grouped(n: Int): Thing[immutable.Seq[Out]] = - transform(new Transformer[Out, immutable.Seq[Out]] { + transform("grouped", () ⇒ new Transformer[Out, immutable.Seq[Out]] { var buf: Vector[Out] = Vector.empty override def onNext(in: Out) = { buf :+= in @@ -276,11 +268,10 @@ private[akka] trait Builder[Out] { Nil } override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) - override def name = "grouped" }) def groupedWithin(n: Int, d: FiniteDuration): Thing[immutable.Seq[Out]] = - transform(new TimerTransformer[Out, immutable.Seq[Out]] { + timerTransform("groupedWithin", () ⇒ new TimerTransformer[Out, immutable.Seq[Out]] { schedulePeriodically(GroupedWithinTimerKey, d) var buf: Vector[Out] = Vector.empty @@ -301,17 +292,18 @@ private[akka] trait Builder[Out] { buf = Vector.empty List(group) } - override def name = "groupedWithin" }) def mapConcat[U](f: Out ⇒ immutable.Seq[U]): Thing[U] = - transform(new Transformer[Out, U] { + transform("mapConcat", () ⇒ new Transformer[Out, U] { override def onNext(in: Out) = f(in) - override def name = "mapConcat" }) - def transform[U](transformer: Transformer[Out, U]): Thing[U] = - andThen(Transform(transformer.asInstanceOf[Transformer[Any, Any]])) + def transform[U](name: String, mkTransformer: () ⇒ Transformer[Out, U]): Thing[U] = + andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) + + def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Thing[U] = + andThen(TimerTransform(name, mkTransformer.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) def zip[O2](other: Publisher[O2]): Thing[(Out, O2)] = andThen(Zip(other.asInstanceOf[Publisher[Any]])) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 594b26cf50..1d20373176 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -3,17 +3,18 @@ */ package akka.stream.impl -import scala.collection.immutable -import scala.util.{ Failure, Success } import akka.actor.Props -import akka.stream.MaterializerSettings -import akka.stream.Transformer +import akka.stream.{ MaterializerSettings, TransformerLike } + +import scala.collection.immutable import scala.util.control.NonFatal /** * INTERNAL API */ -private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, transformer: Transformer[Any, Any]) extends ActorProcessorImpl(_settings) { +private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, transformer: TransformerLike[Any, Any]) + extends ActorProcessorImpl(_settings) { + var hasCleanupRun = false // TODO performance improvement: mutable buffer? var emits = immutable.Seq.empty[Any] diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 7163e55b51..62f4cd4f49 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -14,7 +14,7 @@ import akka.japi.Pair import akka.japi.Predicate import akka.japi.Procedure import akka.japi.Util.immutableSeq -import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } +import akka.stream._ import akka.stream.scaladsl.{ Duct ⇒ SDuct } import akka.stream.impl.Ast import scala.concurrent.duration.FiniteDuration @@ -151,10 +151,35 @@ abstract class Duct[In, Out] { * therefore you don not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. * - * Note that you can use [[akka.stream.TimerTransformer]] if you need support - * for scheduled events in the transformer. + * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. */ - def transform[U](transformer: Transformer[Out, U]): Duct[In, U] + def transform[U](name: String, transformer: () ⇒ Transformer[Out, U]): Duct[In, U] + + /** + * Transformation of a stream, with additional support for scheduled events. + * + * For each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. + */ + def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Duct[In, U] /** * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element @@ -375,8 +400,11 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def mapConcat[U](f: Function[T, java.util.List[U]]): Duct[In, U] = new DuctAdapter(delegate.mapConcat(elem ⇒ immutableSeq(f.apply(elem)))) - override def transform[U](transformer: Transformer[T, U]): Duct[In, U] = - new DuctAdapter(delegate.transform(transformer)) + override def transform[U](name: String, mkTransformer: () ⇒ Transformer[T, U]): Duct[In, U] = + new DuctAdapter(delegate.transform(name, mkTransformer)) + + override def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[T, U]): Duct[In, U] = + new DuctAdapter(delegate.timerTransform(name, mkTransformer)) override def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[T], Publisher[T]]] = new DuctAdapter(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ Pair(taken.asJava, tail) }) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index a635bf9b7b..679ed6eacd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -10,13 +10,9 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import org.reactivestreams.{ Publisher, Subscriber } -import akka.japi.Function -import akka.japi.Function2 -import akka.japi.Pair -import akka.japi.Predicate -import akka.japi.Procedure +import akka.japi._ import akka.japi.Util.immutableSeq -import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } +import akka.stream._ import akka.stream.scaladsl.{ Flow ⇒ SFlow } import scala.concurrent.duration.FiniteDuration import akka.dispatch.ExecutionContexts @@ -197,7 +193,7 @@ abstract class Flow[T] { /** * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] - * function is invoked and expecting a (possibly empty) sequence of output elements + * function is invoked, expecting a (possibly empty) sequence of output elements * to be produced. * After handing off the elements produced from one input element to the downstream * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end @@ -206,6 +202,8 @@ abstract class Flow[T] { * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) * sequence of elements in response to the end-of-stream event. * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. * * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with @@ -213,10 +211,35 @@ abstract class Flow[T] { * therefore you do not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. * - * Note that you can use [[akka.stream.TimerTransformer]] if you need support - * for scheduled events in the transformer. + * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. */ - def transform[U](transformer: Transformer[T, U]): Flow[U] + def transform[U](name: String, mkTransformer: Creator[Transformer[T, U]]): Flow[U] + + /** + * Transformation of a stream, with additional support for scheduled events. + * + * For each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. + */ + def timerTransform[U](name: String, mkTransformer: Creator[TimerTransformer[T, U]]): Flow[U] /** * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element @@ -441,8 +464,11 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U] = new FlowAdapter(delegate.mapConcat(elem ⇒ immutableSeq(f.apply(elem)))) - override def transform[U](transformer: Transformer[T, U]): Flow[U] = - new FlowAdapter(delegate.transform(transformer)) + override def transform[U](name: String, transformer: Creator[Transformer[T, U]]): Flow[U] = + new FlowAdapter(delegate.transform(name, () ⇒ transformer.create())) + + override def timerTransform[U](name: String, transformer: Creator[TimerTransformer[T, U]]): Flow[U] = + new FlowAdapter(delegate.timerTransform(name, () ⇒ transformer.create())) override def prefixAndTail(n: Int): Flow[Pair[java.util.List[T], Publisher[T]]] = new FlowAdapter(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ Pair(taken.asJava, tail) }) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index da22908a63..05c264daf8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import scala.collection.immutable import scala.util.Try import org.reactivestreams.{ Publisher, Subscriber } -import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } +import akka.stream._ import akka.stream.impl.DuctImpl import akka.stream.impl.Ast import scala.concurrent.duration.FiniteDuration @@ -140,10 +140,35 @@ trait Duct[In, +Out] { * therefore you don not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. * - * Note that you can use [[akka.stream.TimerTransformer]] if you need support - * for scheduled events in the transformer. + * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. */ - def transform[U](transformer: Transformer[Out, U]): Duct[In, U] + def transform[U](name: String, transformer: () ⇒ Transformer[Out, U]): Duct[In, U] + + /** + * Transformation of a stream, with additional support for scheduled events. + * + * For each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. + */ + def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[Out, U]): Duct[In, U] /** * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 98a52de972..9adb3b0ea6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -7,7 +7,7 @@ import scala.collection.immutable import scala.concurrent.Future import scala.util.Try import org.reactivestreams.{ Publisher, Subscriber } -import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } +import akka.stream._ import akka.stream.impl.Ast.{ ExistingPublisher, IterablePublisherNode, IteratorPublisherNode, ThunkPublisherNode } import akka.stream.impl.Ast.FuturePublisherNode import akka.stream.impl.FlowImpl @@ -209,10 +209,35 @@ trait Flow[+T] { * therefore you do not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. * - * Note that you can use [[akka.stream.TimerTransformer]] if you need support - * for scheduled events in the transformer. + * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. */ - def transform[U](transformer: Transformer[T, U]): Flow[U] + def transform[U](name: String, mkTransformer: () ⇒ Transformer[T, U]): Flow[U] + + /** + * Transformation of a stream, with additional support for scheduled events. + * + * For each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. + */ + def timerTransform[U](name: String, mkTransformer: () ⇒ TimerTransformer[T, U]): Flow[U] /** * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 312b119e2c..98777d4cfa 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -1,40 +1,26 @@ package akka.stream.javadsl; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import akka.stream.FlattenStrategy; -import akka.stream.OverflowStrategy; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.dispatch.Mapper; +import akka.japi.*; +import akka.stream.*; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; import org.junit.ClassRule; import org.junit.Test; -import static org.junit.Assert.assertEquals; import org.reactivestreams.Publisher; import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.japi.Function; -import akka.japi.Function2; -import akka.japi.Pair; -import akka.japi.Predicate; -import akka.japi.Procedure; -import akka.japi.Util; -import akka.stream.FlowMaterializer; -import akka.stream.MaterializerSettings; -import akka.stream.Transformer; -import akka.stream.testkit.AkkaSpec; -import akka.testkit.JavaTestKit; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; public class FlowTest { @@ -113,30 +99,35 @@ public class FlowTest { final JavaTestKit probe2 = new JavaTestKit(system); final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); // duplicate each element, stop after 4 elements, and emit sum to the end - Flow.create(input).transform(new Transformer() { - int sum = 0; - int count = 0; - + Flow.create(input).transform("publish", new Creator>() { @Override - public scala.collection.immutable.Seq onNext(Integer element) { - sum += element; - count += 1; - return Util.immutableSeq(new Integer[] { element, element }); - } + public Transformer create() throws Exception { + return new Transformer() { + int sum = 0; + int count = 0; - @Override - public boolean isComplete() { - return count == 4; - } + @Override + public scala.collection.immutable.Seq onNext(Integer element) { + sum += element; + count += 1; + return Util.immutableSeq(new Integer[]{element, element}); + } - @Override - public scala.collection.immutable.Seq onTermination(Option e) { - return Util.immutableSingletonSeq(sum); - } + @Override + public boolean isComplete() { + return count == 4; + } - @Override - public void cleanup() { - probe2.getRef().tell("cleanup", ActorRef.noSender()); + @Override + public scala.collection.immutable.Seq onTermination(Option e) { + return Util.immutableSingletonSeq(sum); + } + + @Override + public void cleanup() { + probe2.getRef().tell("cleanup", ActorRef.noSender()); + } + }; } }).foreach(new Procedure() { public void apply(Integer elem) { @@ -167,34 +158,40 @@ public class FlowTest { else return elem + elem; } - }).transform(new Transformer() { - + }).transform("publish", new Creator>() { @Override - public scala.collection.immutable.Seq onNext(Integer element) { - return Util.immutableSingletonSeq(element.toString()); - } + public Transformer create() throws Exception { + return new Transformer() { - @Override - public scala.collection.immutable.Seq onTermination(Option e) { - if (e.isEmpty()) - return Util.immutableSeq(new String[0]); - else - return Util.immutableSingletonSeq(e.get().getMessage()); - } + @Override + public scala.collection.immutable.Seq onNext(Integer element) { + return Util.immutableSingletonSeq(element.toString()); + } - @Override - public void onError(Throwable e) { - } + @Override + public scala.collection.immutable.Seq onTermination(Option e) { + if (e.isEmpty()) { + return Util.immutableSeq(new String[0]); + } else { + return Util.immutableSingletonSeq(e.get().getMessage()); + } + } - @Override - public boolean isComplete() { - return false; - } + @Override + public void onError(Throwable e) { + } - @Override - public void cleanup() { - } + @Override + public boolean isComplete() { + return false; + } + @Override + public void cleanup() { + } + + }; + } }).foreach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala index 69c7e5cfc7..578e0a80a1 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala @@ -21,18 +21,18 @@ class FlowTakeWithinSpec extends AkkaSpec { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() Flow(p).takeWithin(1.second).produceTo(c) - val pSub = p.expectSubscription - val cSub = c.expectSubscription + val pSub = p.expectSubscription() + val cSub = c.expectSubscription() cSub.request(100) - val demand1 = pSub.expectRequest + val demand1 = pSub.expectRequest() (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand2 = pSub.expectRequest + val demand2 = pSub.expectRequest() (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand3 = pSub.expectRequest + val demand3 = pSub.expectRequest() val sentN = demand1 + demand2 (1 to sentN) foreach { n ⇒ c.expectNext(n) } within(2.seconds) { - c.expectComplete + c.expectComplete() } (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } c.expectNoMsg(200.millis) @@ -41,11 +41,11 @@ class FlowTakeWithinSpec extends AkkaSpec { "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[Int]() Flow(1 to 3).takeWithin(1.second).produceTo(c) - val cSub = c.expectSubscription + val cSub = c.expectSubscription() c.expectNoMsg(200.millis) cSub.request(100) (1 to 3) foreach { n ⇒ c.expectNext(n) } - c.expectComplete + c.expectComplete() c.expectNoMsg(200.millis) } diff --git a/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala index 9b142086ad..781ea44482 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTimerTransformerSpec.scala @@ -3,28 +3,22 @@ */ package akka.stream -import scala.concurrent.duration._ -import akka.stream.testkit.StreamTestKit -import akka.stream.testkit.AkkaSpec -import akka.testkit.EventFilter -import com.typesafe.config.ConfigFactory import akka.stream.scaladsl.Flow -import akka.testkit.TestProbe +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } + +import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import scala.collection.immutable @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTimerTransformerSpec extends AkkaSpec { - import system.dispatcher - implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) "A Flow with TimerTransformer operations" must { "produce scheduled ticks as expected" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new TimerTransformer[Int, Int] { + timerTransform("timer", () ⇒ new TimerTransformer[Int, Int] { schedulePeriodically("tick", 100.millis) var tickCount = 0 override def onNext(elem: Int) = List(elem) @@ -49,7 +43,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { "schedule ticks when last transformation step (consume)" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new TimerTransformer[Int, Int] { + timerTransform("timer", () ⇒ new TimerTransformer[Int, Int] { schedulePeriodically("tick", 100.millis) var tickCount = 0 override def onNext(elem: Int) = List(elem) @@ -62,7 +56,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { override def isComplete: Boolean = !isTimerActive("tick") }). consume() - val pSub = p.expectSubscription + val pSub = p.expectSubscription() expectMsg("tick-1") expectMsg("tick-2") expectMsg("tick-3") @@ -73,7 +67,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { val exception = new Exception("Expected exception to the rule") with NoStackTrace val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new TimerTransformer[Int, Int] { + timerTransform("timer", () ⇒ new TimerTransformer[Int, Int] { scheduleOnce("tick", 100.millis) def onNext(element: Int) = Nil diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index 5837de84f6..f751afd90b 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -42,7 +42,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce one-to-one transformation as expected" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem @@ -70,7 +70,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce one-to-several transformation as expected" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem @@ -101,7 +101,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce dropping transformation as expected" in { val p = Flow(List(1, 2, 3, 4).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem @@ -129,14 +129,14 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce multi-step transformation as expected" in { val p = Flow(List("a", "bc", "def").iterator).toPublisher() val p2 = Flow(p). - transform(new TryRecoveryTransformer[String, Int] { + transform("transform", () ⇒ new TryRecoveryTransformer[String, Int] { var concat = "" override def onNext(element: Try[String]) = { concat += element List(concat.length) } }). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(length: Int) = { tot += length @@ -173,7 +173,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "invoke onComplete when done" in { val p = Flow(List("a").iterator).toPublisher() val p2 = Flow(p). - transform(new TryRecoveryTransformer[String, String] { + transform("transform", () ⇒ new TryRecoveryTransformer[String, String] { var s = "" override def onNext(element: Try[String]) = { s += element @@ -193,7 +193,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "allow cancellation using isComplete" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new TryRecoveryTransformer[Int, Int] { + transform("transform", () ⇒ new TryRecoveryTransformer[Int, Int] { var s = "" override def onNext(element: Try[Int]) = { s += element @@ -217,7 +217,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "call onComplete after isComplete signaled completion" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new TryRecoveryTransformer[Int, Int] { + transform("transform", () ⇒ new TryRecoveryTransformer[Int, Int] { var s = "" override def onNext(element: Try[Int]) = { s += element @@ -243,7 +243,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "report error when exception is thrown" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = { if (elem == 2) throw new IllegalArgumentException("two not allowed") else List(elem, elem) @@ -272,7 +272,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { if (elem == 2) throw new IllegalArgumentException("two not allowed") else (1 to 5).map(elem * 100 + _) }. - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem) override def onError(e: Throwable) = () override def onTermination(e: Option[Throwable]) = e match { @@ -317,7 +317,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "transform errors in sequence with normal messages" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new Transformer[Int, String] { + transform("transform", () ⇒ new Transformer[Int, String] { var s = "" override def onNext(element: Int) = { s += element.toString @@ -350,7 +350,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "forward errors when received and thrown" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(in: Int) = List(in) override def onError(e: Throwable) = throw e }). @@ -369,7 +369,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "support cancel as expected" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) override def onError(e: Throwable) = List(-1) }). diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 7c67033310..3cd81ac15b 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -3,21 +3,18 @@ */ package akka.stream -import scala.concurrent.duration._ -import akka.stream.testkit.StreamTestKit -import akka.stream.testkit.AkkaSpec -import akka.testkit.EventFilter -import com.typesafe.config.ConfigFactory import akka.stream.scaladsl.Flow -import akka.testkit.TestProbe +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.testkit.{ EventFilter, TestProbe } +import com.typesafe.config.ConfigFactory + +import scala.collection.immutable.Seq +import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import scala.collection.immutable @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { - import system.dispatcher - implicit val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, @@ -29,7 +26,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "produce one-to-one transformation as expected" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem @@ -52,7 +49,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "produce one-to-several transformation as expected" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem @@ -78,11 +75,15 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "produce dropping transformation as expected" in { val p = Flow(List(1, 2, 3, 4).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem - if (elem % 2 == 0) Nil else List(tot) + if (elem % 2 == 0) { + Nil + } else { + List(tot) + } } }). toPublisher() @@ -101,14 +102,14 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "produce multi-step transformation as expected" in { val p = Flow(List("a", "bc", "def").iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[String, Int] { + transform("transform", () ⇒ new Transformer[String, Int] { var concat = "" override def onNext(elem: String) = { concat += elem List(concat.length) } }). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 override def onNext(length: Int) = { tot += length @@ -140,7 +141,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke onComplete when done" in { val p = Flow(List("a").iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[String, String] { + transform("transform", () ⇒ new Transformer[String, String] { var s = "" override def onNext(element: String) = { s += element @@ -161,7 +162,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d val cleanupProbe = TestProbe() val p = Flow(List("a").iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[String, String] { + transform("transform", () ⇒ new Transformer[String, String] { var s = "" override def onNext(element: String) = { s += element @@ -184,7 +185,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d val cleanupProbe = TestProbe() val p = Flow(List("a").iterator).toPublisher() Flow(p). - transform(new Transformer[String, String] { + transform("transform", () ⇒ new Transformer[String, String] { var s = "x" override def onNext(element: String) = { s = element @@ -200,11 +201,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d val cleanupProbe = TestProbe() val p = Flow(List("a", "b", "c").iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[String, String] { + transform("transform", () ⇒ new Transformer[String, String] { var s = "" override def onNext(in: String) = { - if (in == "b") throw new IllegalArgumentException("Not b") with NoStackTrace - else { + if (in == "b") { + throw new IllegalArgumentException("Not b") with NoStackTrace + } else { val out = s + in s += in.toUpperCase List(out) @@ -227,7 +229,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "allow cancellation using isComplete" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var s = "" override def onNext(element: Int) = { s += element @@ -252,7 +254,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d val cleanupProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { var s = "" override def onNext(element: Int) = { s += element @@ -280,10 +282,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "report error when exception is thrown" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = { - if (elem == 2) throw new IllegalArgumentException("two not allowed") - else List(elem, elem) + if (elem == 2) { + throw new IllegalArgumentException("two not allowed") + } else { + List(elem, elem) + } } }). toPublisher() @@ -302,7 +307,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "support cancel as expected" in { val p = Flow(List(1, 2, 3).iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) }). toPublisher() @@ -321,7 +326,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "support producing elements from empty inputs" in { val p = Flow(List.empty[Int].iterator).toPublisher() val p2 = Flow(p). - transform(new Transformer[Int, Int] { + transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = Nil override def onTermination(e: Option[Throwable]) = List(1, 2, 3) }). @@ -339,7 +344,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "support converting onComplete into onError" in { val subscriber = StreamTestKit.SubscriberProbe[Int]() - Flow(List(5, 1, 2, 3)).transform(new Transformer[Int, Int] { + Flow(List(5, 1, 2, 3)).transform("transform", () ⇒ new Transformer[Int, Int] { var expectedNumberOfElements: Option[Int] = None var count = 0 override def onNext(elem: Int) = @@ -354,7 +359,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d case Some(e) ⇒ Nil case None ⇒ expectedNumberOfElements match { - case Some(expected) if (count != expected) ⇒ + case Some(expected) if count != expected ⇒ throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace case _ ⇒ Nil } @@ -367,7 +372,31 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d subscriber.expectNext(1) subscriber.expectNext(2) subscriber.expectNext(3) - subscriber.expectError.getMessage should be("Expected 5, got 3") + subscriber.expectError().getMessage should be("Expected 5, got 3") + } + + "be safe to reuse" in { + val flow = Flow(1 to 3).transform("transform", () ⇒ + new Transformer[Int, Int] { + var count = 0 + + override def onNext(elem: Int): Seq[Int] = { + count += 1 + List(count) + } + }) + + val s1 = StreamTestKit.SubscriberProbe[Int]() + flow.produceTo(s1) + s1.expectSubscription().request(3) + s1.expectNext(1, 2, 3) + s1.expectComplete() + + val s2 = StreamTestKit.SubscriberProbe[Int]() + flow.produceTo(s2) + s2.expectSubscription().request(3) + s2.expectNext(1, 2, 3) + s2.expectComplete() } }