Merge pull request #15725 from ktoso/wip-transformerFactory-ktoso

!str #15271 make transform() take a factory instead of Transformer
This commit is contained in:
Konrad Malawski 2014-08-26 16:15:36 +02:00
commit 55e48918fd
26 changed files with 368 additions and 281 deletions

View file

@ -44,14 +44,14 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
Duct[(HttpRequest, Any)] Duct[(HttpRequest, Any)]
.broadcast(contextBypassSubscriber) .broadcast(contextBypassSubscriber)
.map(requestMethodByPass) .map(requestMethodByPass)
.transform(responseRendererFactory.newRenderer) .transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat) .flatten(FlattenStrategy.concat)
.transform(errorLogger(log, "Outgoing request stream error")) .transform("errorLogger", () errorLogger(log, "Outgoing request stream error"))
.produceTo(tcpConn.outputStream)(materializer) .produceTo(tcpConn.outputStream)(materializer)
val responsePublisher = val responsePublisher =
Flow(tcpConn.inputStream) Flow(tcpConn.inputStream)
.transform(rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass)) .transform("rootParser", () rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass))
.splitWhen(_.isInstanceOf[MessageStart]) .splitWhen(_.isInstanceOf[MessageStart])
.headAndTail(materializer) .headAndTail(materializer)
.collect { .collect {

View file

@ -43,7 +43,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
*/ */
def toStrict(timeout: FiniteDuration, materializer: FlowMaterializer)(implicit ec: ExecutionContext): Future[HttpEntity.Strict] = def toStrict(timeout: FiniteDuration, materializer: FlowMaterializer)(implicit ec: ExecutionContext): Future[HttpEntity.Strict] =
Flow(dataBytes(materializer)) Flow(dataBytes(materializer))
.transform(new TimerTransformer[ByteString, HttpEntity.Strict] { .timerTransform("toStrict", () new TimerTransformer[ByteString, HttpEntity.Strict] {
var bytes = ByteString.newBuilder var bytes = ByteString.newBuilder
scheduleOnce("", timeout) scheduleOnce("", timeout)

View file

@ -106,12 +106,12 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`
case HttpEntity.Default(_, contentLength, data) case HttpEntity.Default(_, contentLength, data)
renderContentLength(contentLength) renderContentLength(contentLength)
renderByteStrings(r, renderByteStrings(r,
Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer), Flow(data).transform("checkContentLenght", () new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer),
materializer) materializer)
case HttpEntity.Chunked(_, chunks) case HttpEntity.Chunked(_, chunks)
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf ~~ CrLf r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf ~~ CrLf
renderByteStrings(r, Flow(chunks).transform(new ChunkTransformer).toPublisher()(materializer), materializer) renderByteStrings(r, Flow(chunks).transform("chunkTransform", () new ChunkTransformer).toPublisher()(materializer), materializer)
} }
renderRequestLine() renderRequestLine()

View file

@ -132,7 +132,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
renderHeaders(headers.toList) renderHeaders(headers.toList)
renderEntityContentType(r, entity) renderEntityContentType(r, entity)
r ~~ `Content-Length` ~~ contentLength ~~ CrLf ~~ CrLf r ~~ `Content-Length` ~~ contentLength ~~ CrLf ~~ CrLf
byteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer)) byteStrings(Flow(data).transform("checkContentLenght", () new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer))
case HttpEntity.CloseDelimited(_, data) case HttpEntity.CloseDelimited(_, data)
renderHeaders(headers.toList, alwaysClose = true) renderHeaders(headers.toList, alwaysClose = true)
@ -149,7 +149,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
if (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) if (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD)
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf
r ~~ CrLf r ~~ CrLf
byteStrings(Flow(chunks).transform(new ChunkTransformer).toPublisher()(materializer)) byteStrings(Flow(chunks).transform("checkContentLenght", () new ChunkTransformer).toPublisher()(materializer))
} }
} }

View file

@ -40,7 +40,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
val requestPublisher = val requestPublisher =
Flow(tcpConn.inputStream) Flow(tcpConn.inputStream)
.transform(rootParser.copyWith(warnOnIllegalHeader)) .transform("rootParser", () rootParser.copyWith(warnOnIllegalHeader))
// this will create extra single element `[MessageEnd]` substreams, that will // this will create extra single element `[MessageEnd]` substreams, that will
// be filtered out by the above `collect` for the applicationBypass and the // be filtered out by the above `collect` for the applicationBypass and the
// below `collect` for the actual request handling // below `collect` for the actual request handling
@ -58,10 +58,10 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
val responseSubscriber = val responseSubscriber =
Duct[HttpResponse] Duct[HttpResponse]
.merge(applicationBypassPublisher) .merge(applicationBypassPublisher)
.transform(applyApplicationBypass) .transform("applyApplicationBypass", () applyApplicationBypass)
.transform(responseRendererFactory.newRenderer) .transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat) .flatten(FlattenStrategy.concat)
.transform(errorLogger(log, "Outgoing response stream error")) .transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
.produceTo(tcpConn.outputStream)(materializer) .produceTo(tcpConn.outputStream)(materializer)
Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber) Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber)

View file

@ -37,8 +37,8 @@ package object util {
private[http] implicit class FlowWithPrintEvent[T](val underlying: Flow[T]) { private[http] implicit class FlowWithPrintEvent[T](val underlying: Flow[T]) {
def printEvent(marker: String): Flow[T] = def printEvent(marker: String): Flow[T] =
underlying.transform { underlying.transform("transform",
new Transformer[T, T] { () new Transformer[T, T] {
def onNext(element: T) = { def onNext(element: T) = {
println(s"$marker: $element") println(s"$marker: $element")
element :: Nil element :: Nil
@ -47,8 +47,7 @@ package object util {
println(s"$marker: Terminated with error $e") println(s"$marker: Terminated with error $e")
Nil Nil
} }
} })
}
} }
private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] =

View file

@ -354,7 +354,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val future = val future =
Flow(input.toList) Flow(input.toList)
.map(ByteString.apply) .map(ByteString.apply)
.transform(parser) .transform("parser", () parser)
.splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) .splitWhen(_.isInstanceOf[ParserOutput.MessageStart])
.headAndTail(materializer) .headAndTail(materializer)
.collect { .collect {

View file

@ -213,7 +213,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val future = val future =
Flow(input.toList) Flow(input.toList)
.map(ByteString.apply) .map(ByteString.apply)
.transform(newParser(requestMethod)) .transform("parser", () newParser(requestMethod))
.splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) .splitWhen(_.isInstanceOf[ParserOutput.MessageStart])
.headAndTail(materializer) .headAndTail(materializer)
.collect { .collect {

View file

@ -39,7 +39,7 @@ trait MultipartMarshallers {
Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset) Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset)
val log = actorSystem(refFactory).log val log = actorSystem(refFactory).log
val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, fm, log) val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, fm, log)
val chunks = Flow(value.parts).transform(bodyPartRenderer).flatten(FlattenStrategy.concat).toPublisher()(fm) val chunks = Flow(value.parts).transform("bodyPartRenderer", () bodyPartRenderer).flatten(FlattenStrategy.concat).toPublisher()(fm)
HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks) HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks)
} }
} }

View file

@ -40,7 +40,7 @@ trait MultipartUnmarshallers {
case None sys.error("Content-Type with a multipart media type must have a 'boundary' parameter") case None sys.error("Content-Type with a multipart media type must have a 'boundary' parameter")
case Some(boundary) case Some(boundary)
val bodyParts = Flow(entity.dataBytes(fm)) val bodyParts = Flow(entity.dataBytes(fm))
.transform(new BodyPartParser(defaultContentType, boundary, fm, actorSystem(refFactory).log)) .transform("bodyPart", () new BodyPartParser(defaultContentType, boundary, fm, actorSystem(refFactory).log))
.splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart]) .splitWhen(_.isInstanceOf[BodyPartParser.BodyPartStart])
.headAndTail(fm) .headAndTail(fm)
.collect { .collect {

View file

@ -3,16 +3,15 @@
*/ */
package akka.stream package akka.stream
import scala.collection.immutable import akka.actor.{ ActorContext, Cancellable }
import scala.collection.mutable
import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorContext
import akka.actor.Cancellable
/** /**
* [[Transformer]] with support for scheduling keyed (named) timer events. * [[Transformer]] with support for scheduling keyed (named) timer events.
*/ */
abstract class TimerTransformer[-T, +U] extends Transformer[T, U] { abstract class TimerTransformer[-T, +U] extends TransformerLike[T, U] {
import TimerTransformer._ import TimerTransformer._
private val timers = mutable.Map[Any, Timer]() private val timers = mutable.Map[Any, Timer]()
private val timerIdGen = Iterator from 1 private val timerIdGen = Iterator from 1

View file

@ -5,18 +5,7 @@ package akka.stream
import scala.collection.immutable import scala.collection.immutable
/** abstract class TransformerLike[-T, +U] {
* General interface for stream transformation.
*
* It is possible to keep state in the concrete [[Transformer]] instance with
* ordinary instance variables. The [[Transformer]] is executed by an actor and
* therefore you don not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* @see [[akka.stream.scaladsl.Flow#transform]]
* @see [[akka.stream.javadsl.Flow#transform]]
*/
abstract class Transformer[-T, +U] {
/** /**
* Invoked for each element to produce a (possibly empty) sequence of * Invoked for each element to produce a (possibly empty) sequence of
* output elements. * output elements.
@ -55,9 +44,17 @@ abstract class Transformer[-T, +U] {
*/ */
def cleanup(): Unit = () def cleanup(): Unit = ()
/**
* Name of this transformation step. Used as part of the actor name.
* Facilitates debugging and logging.
*/
def name: String = "transform"
} }
/**
* General interface for stream transformation.
*
* It is possible to keep state in the concrete [[Transformer]] instance with
* ordinary instance variables. The [[Transformer]] is executed by an actor and
* therefore you don not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* @see [[akka.stream.scaladsl.Flow#transform]]
* @see [[akka.stream.javadsl.Flow#transform]]
*/
abstract class Transformer[-T, +U] extends TransformerLike[T, U]

View file

@ -27,9 +27,9 @@ private[akka] trait TimedOps {
def timed[I, O](flow: Flow[I], measuredOps: Flow[I] Flow[O], onComplete: FiniteDuration Unit): Flow[O] = { def timed[I, O](flow: Flow[I], measuredOps: Flow[I] Flow[O], onComplete: FiniteDuration Unit): Flow[O] = {
val ctx = new TimedFlowContext val ctx = new TimedFlowContext
val startWithTime = flow.transform(new StartTimedFlow(ctx)) val startWithTime = flow.transform("startTimed", () new StartTimedFlow(ctx))
val userFlow = measuredOps(startWithTime) val userFlow = measuredOps(startWithTime)
userFlow.transform(new StopTimed(ctx, onComplete)) userFlow.transform("stopTimed", () new StopTimed(ctx, onComplete))
} }
/** /**
@ -41,9 +41,9 @@ private[akka] trait TimedOps {
// todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type) // todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type)
val ctx = new TimedFlowContext val ctx = new TimedFlowContext
val startWithTime: Duct[I, O] = duct.transform(new StartTimedFlow(ctx)) val startWithTime: Duct[I, O] = duct.transform("startTimed", () new StartTimedFlow(ctx))
val userFlow: Duct[O, Out] = measuredOps(startWithTime) val userFlow: Duct[O, Out] = measuredOps(startWithTime)
userFlow.transform(new StopTimed(ctx, onComplete)) userFlow.transform("stopTimed", () new StopTimed(ctx, onComplete))
} }
} }
@ -61,7 +61,7 @@ private[akka] trait TimedIntervalBetweenOps {
* Measures rolling interval between immediatly subsequent `matching(o: O)` elements. * Measures rolling interval between immediatly subsequent `matching(o: O)` elements.
*/ */
def timedIntervalBetween[O](flow: Flow[O], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[O] = { def timedIntervalBetween[O](flow: Flow[O], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[O] = {
flow.transform(new TimedIntervalTransformer[O](matching, onInterval)) flow.transform("timedInterval", () new TimedIntervalTransformer[O](matching, onInterval))
} }
/** /**
@ -69,7 +69,7 @@ private[akka] trait TimedIntervalBetweenOps {
*/ */
def timedIntervalBetween[I, O](duct: Duct[I, O], matching: O Boolean, onInterval: FiniteDuration Unit): Duct[I, O] = { def timedIntervalBetween[I, O](duct: Duct[I, O], matching: O Boolean, onInterval: FiniteDuration Unit): Duct[I, O] = {
// todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type) // todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type)
duct.transform(new TimedIntervalTransformer[O](matching, onInterval)) duct.transform("timedInterval", () new TimedIntervalTransformer[O](matching, onInterval))
} }
} }
@ -100,8 +100,6 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
} }
final class StartTimedFlow[T](ctx: TimedFlowContext) extends Transformer[T, T] { final class StartTimedFlow[T](ctx: TimedFlowContext) extends Transformer[T, T] {
override def name = "startTimed"
private var started = false private var started = false
override def onNext(element: T) = { override def onNext(element: T) = {
@ -115,7 +113,6 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
} }
final class StopTimed[T](ctx: TimedFlowContext, _onComplete: FiniteDuration Unit) extends Transformer[T, T] { final class StopTimed[T](ctx: TimedFlowContext, _onComplete: FiniteDuration Unit) extends Transformer[T, T] {
override def name = "stopTimed"
override def cleanup() { override def cleanup() {
val d = ctx.stop() val d = ctx.stop()
@ -126,8 +123,6 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
} }
final class TimedIntervalTransformer[T](matching: T Boolean, onInterval: FiniteDuration Unit) extends Transformer[T, T] { final class TimedIntervalTransformer[T](matching: T Boolean, onInterval: FiniteDuration Unit) extends Transformer[T, T] {
override def name = "timedInterval"
private var prevNanos = 0L private var prevNanos = 0L
private var matched = 0L private var matched = 0L

View file

@ -3,34 +3,18 @@
*/ */
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef }
import akka.pattern.ask
import akka.stream._
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import org.reactivestreams.{ Publisher, Subscriber, Processor } import scala.concurrent.{ Await, Future }
import akka.actor.ActorRefFactory
import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer, Transformer }
import scala.util.Try
import scala.concurrent.Future
import scala.util.Success
import scala.util.Failure
import java.util.concurrent.atomic.AtomicLong
import akka.actor.ActorContext
import akka.actor.ExtensionIdProvider
import akka.actor.ExtensionId
import akka.actor.ExtendedActorSystem
import akka.actor.ActorSystem
import akka.actor.Extension
import scala.concurrent.duration.FiniteDuration
import akka.stream.TimerTransformer
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await import scala.util.{ Failure, Success }
import akka.actor.LocalActorRef
import akka.actor.RepointableActorRef
import akka.actor.ActorCell
/** /**
* INTERNAL API * INTERNAL API
@ -40,9 +24,8 @@ private[akka] object Ast {
def name: String def name: String
} }
case class Transform(transformer: Transformer[Any, Any]) extends AstNode { case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
override def name = transformer.name case class TimerTransform(name: String, mkTransformer: () TimerTransformer[Any, Any]) extends AstNode
}
case class MapFuture(f: Any Future[Any]) extends AstNode { case class MapFuture(f: Any Future[Any]) extends AstNode {
override def name = "mapFuture" override def name = "mapFuture"
} }
@ -137,8 +120,7 @@ private[akka] case class ActorBasedFlowMaterializer(
flowNameCounter: AtomicLong, flowNameCounter: AtomicLong,
namePrefix: String) namePrefix: String)
extends FlowMaterializer(settings) { extends FlowMaterializer(settings) {
import Ast._ import akka.stream.impl.Ast._
import ActorBasedFlowMaterializer._
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
@ -170,7 +152,7 @@ private[akka] case class ActorBasedFlowMaterializer(
} }
} }
private val identityTransform = Transform( private val identityTransform = Transform("identity", ()
new Transformer[Any, Any] { new Transformer[Any, Any] {
override def onNext(element: Any) = List(element) override def onNext(element: Any) = List(element)
}) })
@ -178,7 +160,6 @@ private[akka] case class ActorBasedFlowMaterializer(
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessor.props(settings, op), s"$flowName-$n-${op.name}") val impl = actorOf(ActorProcessor.props(settings, op), s"$flowName-$n-${op.name}")
ActorProcessor(impl) ActorProcessor(impl)
} }
def actorOf(props: Props, name: String): ActorRef = supervisor match { def actorOf(props: Props, name: String): ActorRef = supervisor match {

View file

@ -19,9 +19,8 @@ private[akka] object ActorProcessor {
import Ast._ import Ast._
def props(settings: MaterializerSettings, op: AstNode): Props = def props(settings: MaterializerSettings, op: AstNode): Props =
(op match { (op match {
case Transform(transformer: TimerTransformer[_, _]) case t: TimerTransform Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer()))
Props(new TimerTransformerProcessorsImpl(settings, transformer)) case t: Transform Props(new TransformProcessorImpl(settings, t.mkTransformer()))
case t: Transform Props(new TransformProcessorImpl(settings, t.transformer))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p)) case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f)) case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge Props(new MergeImpl(settings, m.other)) case m: Merge Props(new MergeImpl(settings, m.other))

View file

@ -37,7 +37,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops
override def toFuture()(implicit materializer: FlowMaterializer): Future[O] = { override def toFuture()(implicit materializer: FlowMaterializer): Future[O] = {
val p = Promise[O]() val p = Promise[O]()
transform(new Transformer[O, Unit] { transform("toFuture", () new Transformer[O, Unit] {
var done = false var done = false
override def onNext(in: O) = { p success in; done = true; Nil } override def onNext(in: O) = { p success in; done = true; Nil }
override def onError(e: Throwable) = { p failure e } override def onError(e: Throwable) = { p failure e }
@ -51,7 +51,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops
produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize))
override def onComplete(callback: Try[Unit] Unit)(implicit materializer: FlowMaterializer): Unit = override def onComplete(callback: Try[Unit] Unit)(implicit materializer: FlowMaterializer): Unit =
transform(new Transformer[O, Unit] { transform("onComplete", () new Transformer[O, Unit] {
override def onNext(in: O) = Nil override def onNext(in: O) = Nil
override def onError(e: Throwable) = { override def onError(e: Throwable) = {
callback(Failure(e)) callback(Failure(e))
@ -95,7 +95,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[
produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize)) produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize))
override def onComplete(callback: Try[Unit] Unit)(implicit materializer: FlowMaterializer): Subscriber[In] = override def onComplete(callback: Try[Unit] Unit)(implicit materializer: FlowMaterializer): Subscriber[In] =
transform(new Transformer[Out, Unit] { transform("onComplete", () new Transformer[Out, Unit] {
override def onNext(in: Out) = Nil override def onNext(in: Out) = Nil
override def onError(e: Throwable) = { override def onError(e: Throwable) = {
callback(Failure(e)) callback(Failure(e))
@ -155,45 +155,41 @@ private[akka] trait Builder[Out] {
protected def andThen[U](op: Ast.AstNode): Thing[U] protected def andThen[U](op: Ast.AstNode): Thing[U]
def map[U](f: Out U): Thing[U] = def map[U](f: Out U): Thing[U] =
transform(new Transformer[Out, U] { transform("map", () new Transformer[Out, U] {
override def onNext(in: Out) = List(f(in)) override def onNext(in: Out) = List(f(in))
override def name = "map"
}) })
def mapFuture[U](f: Out Future[U]): Thing[U] = def mapFuture[U](f: Out Future[U]): Thing[U] =
andThen(MapFuture(f.asInstanceOf[Any Future[Any]])) andThen(MapFuture(f.asInstanceOf[Any Future[Any]]))
def filter(p: Out Boolean): Thing[Out] = def filter(p: Out Boolean): Thing[Out] =
transform(new Transformer[Out, Out] { transform("filter", () new Transformer[Out, Out] {
override def onNext(in: Out) = if (p(in)) List(in) else Nil override def onNext(in: Out) = if (p(in)) List(in) else Nil
override def name = "filter"
}) })
def collect[U](pf: PartialFunction[Out, U]): Thing[U] = def collect[U](pf: PartialFunction[Out, U]): Thing[U] =
transform(new Transformer[Out, U] { transform("collect", () new Transformer[Out, U] {
override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil
}) })
def foreachTransform(c: Out Unit): Thing[Unit] = def foreachTransform(c: Out Unit): Thing[Unit] =
transform(new Transformer[Out, Unit] { transform("foreach", () new Transformer[Out, Unit] {
override def onNext(in: Out) = { c(in); Nil } override def onNext(in: Out) = { c(in); Nil }
override def onTermination(e: Option[Throwable]) = ListOfUnit override def onTermination(e: Option[Throwable]) = ListOfUnit
override def name = "foreach"
}) })
def fold[U](zero: U)(f: (U, Out) U): Thing[U] = def fold[U](zero: U)(f: (U, Out) U): Thing[U] =
transform(new FoldTransformer[U](zero, f)) transform("fold", () new FoldTransformer[U](zero, f))
// Without this class compiler complains about // Without this class compiler complains about
// "Parameter type in structural refinement may not refer to an abstract type defined outside that refinement" // "Parameter type in structural refinement may not refer to an abstract type defined outside that refinement"
class FoldTransformer[S](var state: S, f: (S, Out) S) extends Transformer[Out, S] { class FoldTransformer[S](var state: S, f: (S, Out) S) extends Transformer[Out, S] {
override def onNext(in: Out): immutable.Seq[S] = { state = f(state, in); Nil } override def onNext(in: Out): immutable.Seq[S] = { state = f(state, in); Nil }
override def onTermination(e: Option[Throwable]): immutable.Seq[S] = List(state) override def onTermination(e: Option[Throwable]): immutable.Seq[S] = List(state)
override def name = "fold"
} }
def drop(n: Int): Thing[Out] = def drop(n: Int): Thing[Out] =
transform(new Transformer[Out, Out] { transform("drop", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] = var delegate: Transformer[Out, Out] =
if (n == 0) identityTransformer.asInstanceOf[Transformer[Out, Out]] if (n == 0) identityTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] { else new Transformer[Out, Out] {
@ -207,11 +203,10 @@ private[akka] trait Builder[Out] {
} }
override def onNext(in: Out) = delegate.onNext(in) override def onNext(in: Out) = delegate.onNext(in)
override def name = "drop"
}) })
def dropWithin(d: FiniteDuration): Thing[Out] = def dropWithin(d: FiniteDuration): Thing[Out] =
transform(new TimerTransformer[Out, Out] { timerTransform("dropWithin", () new TimerTransformer[Out, Out] {
scheduleOnce(DropWithinTimerKey, d) scheduleOnce(DropWithinTimerKey, d)
var delegate: Transformer[Out, Out] = var delegate: Transformer[Out, Out] =
@ -224,11 +219,10 @@ private[akka] trait Builder[Out] {
delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]]
Nil Nil
} }
override def name = "dropWithin"
}) })
def take(n: Int): Thing[Out] = def take(n: Int): Thing[Out] =
transform(new Transformer[Out, Out] { transform("take", () new Transformer[Out, Out] {
var delegate: Transformer[Out, Out] = var delegate: Transformer[Out, Out] =
if (n == 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] if (n == 0) takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
else new Transformer[Out, Out] { else new Transformer[Out, Out] {
@ -243,11 +237,10 @@ private[akka] trait Builder[Out] {
override def onNext(in: Out) = delegate.onNext(in) override def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete override def isComplete = delegate.isComplete
override def name = "take"
}) })
def takeWithin(d: FiniteDuration): Thing[Out] = def takeWithin(d: FiniteDuration): Thing[Out] =
transform(new TimerTransformer[Out, Out] { timerTransform("takeWithin", () new TimerTransformer[Out, Out] {
scheduleOnce(TakeWithinTimerKey, d) scheduleOnce(TakeWithinTimerKey, d)
var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]] var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]]
@ -258,13 +251,12 @@ private[akka] trait Builder[Out] {
delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
Nil Nil
} }
override def name = "takeWithin"
}) })
def prefixAndTail[U >: Out](n: Int): Thing[(immutable.Seq[Out], Publisher[U])] = andThen(PrefixAndTail(n)) def prefixAndTail[U >: Out](n: Int): Thing[(immutable.Seq[Out], Publisher[U])] = andThen(PrefixAndTail(n))
def grouped(n: Int): Thing[immutable.Seq[Out]] = def grouped(n: Int): Thing[immutable.Seq[Out]] =
transform(new Transformer[Out, immutable.Seq[Out]] { transform("grouped", () new Transformer[Out, immutable.Seq[Out]] {
var buf: Vector[Out] = Vector.empty var buf: Vector[Out] = Vector.empty
override def onNext(in: Out) = { override def onNext(in: Out) = {
buf :+= in buf :+= in
@ -276,11 +268,10 @@ private[akka] trait Builder[Out] {
Nil Nil
} }
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
override def name = "grouped"
}) })
def groupedWithin(n: Int, d: FiniteDuration): Thing[immutable.Seq[Out]] = def groupedWithin(n: Int, d: FiniteDuration): Thing[immutable.Seq[Out]] =
transform(new TimerTransformer[Out, immutable.Seq[Out]] { timerTransform("groupedWithin", () new TimerTransformer[Out, immutable.Seq[Out]] {
schedulePeriodically(GroupedWithinTimerKey, d) schedulePeriodically(GroupedWithinTimerKey, d)
var buf: Vector[Out] = Vector.empty var buf: Vector[Out] = Vector.empty
@ -301,17 +292,18 @@ private[akka] trait Builder[Out] {
buf = Vector.empty buf = Vector.empty
List(group) List(group)
} }
override def name = "groupedWithin"
}) })
def mapConcat[U](f: Out immutable.Seq[U]): Thing[U] = def mapConcat[U](f: Out immutable.Seq[U]): Thing[U] =
transform(new Transformer[Out, U] { transform("mapConcat", () new Transformer[Out, U] {
override def onNext(in: Out) = f(in) override def onNext(in: Out) = f(in)
override def name = "mapConcat"
}) })
def transform[U](transformer: Transformer[Out, U]): Thing[U] = def transform[U](name: String, mkTransformer: () Transformer[Out, U]): Thing[U] =
andThen(Transform(transformer.asInstanceOf[Transformer[Any, Any]])) andThen(Transform(name, mkTransformer.asInstanceOf[() Transformer[Any, Any]]))
def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Thing[U] =
andThen(TimerTransform(name, mkTransformer.asInstanceOf[() TimerTransformer[Any, Any]]))
def zip[O2](other: Publisher[O2]): Thing[(Out, O2)] = andThen(Zip(other.asInstanceOf[Publisher[Any]])) def zip[O2](other: Publisher[O2]): Thing[(Out, O2)] = andThen(Zip(other.asInstanceOf[Publisher[Any]]))

View file

@ -3,17 +3,18 @@
*/ */
package akka.stream.impl package akka.stream.impl
import scala.collection.immutable
import scala.util.{ Failure, Success }
import akka.actor.Props import akka.actor.Props
import akka.stream.MaterializerSettings import akka.stream.{ MaterializerSettings, TransformerLike }
import akka.stream.Transformer
import scala.collection.immutable
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, transformer: Transformer[Any, Any]) extends ActorProcessorImpl(_settings) { private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, transformer: TransformerLike[Any, Any])
extends ActorProcessorImpl(_settings) {
var hasCleanupRun = false var hasCleanupRun = false
// TODO performance improvement: mutable buffer? // TODO performance improvement: mutable buffer?
var emits = immutable.Seq.empty[Any] var emits = immutable.Seq.empty[Any]

View file

@ -14,7 +14,7 @@ import akka.japi.Pair
import akka.japi.Predicate import akka.japi.Predicate
import akka.japi.Procedure import akka.japi.Procedure
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream._
import akka.stream.scaladsl.{ Duct SDuct } import akka.stream.scaladsl.{ Duct SDuct }
import akka.stream.impl.Ast import akka.stream.impl.Ast
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
@ -151,10 +151,35 @@ abstract class Duct[In, Out] {
* therefore you don not have to add any additional thread safety or memory * therefore you don not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods. * visibility constructs to access the state from the callback methods.
* *
* Note that you can use [[akka.stream.TimerTransformer]] if you need support * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer.
* for scheduled events in the transformer.
*/ */
def transform[U](transformer: Transformer[Out, U]): Duct[In, U] def transform[U](name: String, transformer: () Transformer[Out, U]): Duct[In, U]
/**
* Transformation of a stream, with additional support for scheduled events.
*
* For each element the [[akka.stream.Transformer#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced.
* After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
* stream processing at this point; in that case the upstream subscription is
* canceled. Before signaling normal completion to the downstream subscribers,
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event.
*
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Duct[In, U]
/** /**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element
@ -375,8 +400,11 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In,
override def mapConcat[U](f: Function[T, java.util.List[U]]): Duct[In, U] = override def mapConcat[U](f: Function[T, java.util.List[U]]): Duct[In, U] =
new DuctAdapter(delegate.mapConcat(elem immutableSeq(f.apply(elem)))) new DuctAdapter(delegate.mapConcat(elem immutableSeq(f.apply(elem))))
override def transform[U](transformer: Transformer[T, U]): Duct[In, U] = override def transform[U](name: String, mkTransformer: () Transformer[T, U]): Duct[In, U] =
new DuctAdapter(delegate.transform(transformer)) new DuctAdapter(delegate.transform(name, mkTransformer))
override def timerTransform[U](name: String, mkTransformer: () TimerTransformer[T, U]): Duct[In, U] =
new DuctAdapter(delegate.timerTransform(name, mkTransformer))
override def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[T], Publisher[T]]] = override def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[T], Publisher[T]]] =
new DuctAdapter(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail) }) new DuctAdapter(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail) })

View file

@ -10,13 +10,9 @@ import scala.concurrent.Future
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import akka.japi.Function import akka.japi._
import akka.japi.Function2
import akka.japi.Pair
import akka.japi.Predicate
import akka.japi.Procedure
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream._
import akka.stream.scaladsl.{ Flow SFlow } import akka.stream.scaladsl.{ Flow SFlow }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
@ -197,7 +193,7 @@ abstract class Flow[T] {
/** /**
* Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]]
* function is invoked and expecting a (possibly empty) sequence of output elements * function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced. * to be produced.
* After handing off the elements produced from one input element to the downstream * After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
@ -206,6 +202,8 @@ abstract class Flow[T] {
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event. * sequence of elements in response to the end-of-stream event.
* *
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
* *
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
@ -213,10 +211,35 @@ abstract class Flow[T] {
* therefore you do not have to add any additional thread safety or memory * therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods. * visibility constructs to access the state from the callback methods.
* *
* Note that you can use [[akka.stream.TimerTransformer]] if you need support * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer.
* for scheduled events in the transformer.
*/ */
def transform[U](transformer: Transformer[T, U]): Flow[U] def transform[U](name: String, mkTransformer: Creator[Transformer[T, U]]): Flow[U]
/**
* Transformation of a stream, with additional support for scheduled events.
*
* For each element the [[akka.stream.Transformer#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced.
* After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
* stream processing at this point; in that case the upstream subscription is
* canceled. Before signaling normal completion to the downstream subscribers,
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event.
*
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
def timerTransform[U](name: String, mkTransformer: Creator[TimerTransformer[T, U]]): Flow[U]
/** /**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element
@ -441,8 +464,11 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] {
override def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U] = override def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U] =
new FlowAdapter(delegate.mapConcat(elem immutableSeq(f.apply(elem)))) new FlowAdapter(delegate.mapConcat(elem immutableSeq(f.apply(elem))))
override def transform[U](transformer: Transformer[T, U]): Flow[U] = override def transform[U](name: String, transformer: Creator[Transformer[T, U]]): Flow[U] =
new FlowAdapter(delegate.transform(transformer)) new FlowAdapter(delegate.transform(name, () transformer.create()))
override def timerTransform[U](name: String, transformer: Creator[TimerTransformer[T, U]]): Flow[U] =
new FlowAdapter(delegate.timerTransform(name, () transformer.create()))
override def prefixAndTail(n: Int): Flow[Pair[java.util.List[T], Publisher[T]]] = override def prefixAndTail(n: Int): Flow[Pair[java.util.List[T], Publisher[T]]] =
new FlowAdapter(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail) }) new FlowAdapter(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail) })

View file

@ -6,7 +6,7 @@ package akka.stream.scaladsl
import scala.collection.immutable import scala.collection.immutable
import scala.util.Try import scala.util.Try
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream._
import akka.stream.impl.DuctImpl import akka.stream.impl.DuctImpl
import akka.stream.impl.Ast import akka.stream.impl.Ast
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
@ -140,10 +140,35 @@ trait Duct[In, +Out] {
* therefore you don not have to add any additional thread safety or memory * therefore you don not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods. * visibility constructs to access the state from the callback methods.
* *
* Note that you can use [[akka.stream.TimerTransformer]] if you need support * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer.
* for scheduled events in the transformer.
*/ */
def transform[U](transformer: Transformer[Out, U]): Duct[In, U] def transform[U](name: String, transformer: () Transformer[Out, U]): Duct[In, U]
/**
* Transformation of a stream, with additional support for scheduled events.
*
* For each element the [[akka.stream.Transformer#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced.
* After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
* stream processing at this point; in that case the upstream subscription is
* canceled. Before signaling normal completion to the downstream subscribers,
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event.
*
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Duct[In, U]
/** /**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element

View file

@ -7,7 +7,7 @@ import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.Try import scala.util.Try
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream._
import akka.stream.impl.Ast.{ ExistingPublisher, IterablePublisherNode, IteratorPublisherNode, ThunkPublisherNode } import akka.stream.impl.Ast.{ ExistingPublisher, IterablePublisherNode, IteratorPublisherNode, ThunkPublisherNode }
import akka.stream.impl.Ast.FuturePublisherNode import akka.stream.impl.Ast.FuturePublisherNode
import akka.stream.impl.FlowImpl import akka.stream.impl.FlowImpl
@ -209,10 +209,35 @@ trait Flow[+T] {
* therefore you do not have to add any additional thread safety or memory * therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods. * visibility constructs to access the state from the callback methods.
* *
* Note that you can use [[akka.stream.TimerTransformer]] if you need support * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer.
* for scheduled events in the transformer.
*/ */
def transform[U](transformer: Transformer[T, U]): Flow[U] def transform[U](name: String, mkTransformer: () Transformer[T, U]): Flow[U]
/**
* Transformation of a stream, with additional support for scheduled events.
*
* For each element the [[akka.stream.Transformer#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced.
* After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
* stream processing at this point; in that case the upstream subscription is
* canceled. Before signaling normal completion to the downstream subscribers,
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event.
*
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
def timerTransform[U](name: String, mkTransformer: () TimerTransformer[T, U]): Flow[U]
/** /**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element

View file

@ -1,40 +1,26 @@
package akka.stream.javadsl; package akka.stream.javadsl;
import java.util.ArrayList; import akka.actor.ActorRef;
import java.util.Arrays; import akka.actor.ActorSystem;
import java.util.HashMap; import akka.dispatch.Futures;
import java.util.HashSet; import akka.dispatch.Mapper;
import java.util.List; import akka.japi.*;
import java.util.Map; import akka.stream.*;
import java.util.Set; import akka.stream.testkit.AkkaSpec;
import java.util.concurrent.Callable; import akka.testkit.JavaTestKit;
import java.util.concurrent.TimeUnit;
import akka.stream.FlattenStrategy;
import akka.stream.OverflowStrategy;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import scala.Option; import scala.Option;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef; import java.util.*;
import akka.actor.ActorSystem; import java.util.concurrent.Callable;
import akka.dispatch.Futures; import java.util.concurrent.TimeUnit;
import akka.dispatch.Mapper;
import akka.japi.Function; import static org.junit.Assert.assertEquals;
import akka.japi.Function2;
import akka.japi.Pair;
import akka.japi.Predicate;
import akka.japi.Procedure;
import akka.japi.Util;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.Transformer;
import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
public class FlowTest { public class FlowTest {
@ -113,30 +99,35 @@ public class FlowTest {
final JavaTestKit probe2 = new JavaTestKit(system); final JavaTestKit probe2 = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
// duplicate each element, stop after 4 elements, and emit sum to the end // duplicate each element, stop after 4 elements, and emit sum to the end
Flow.create(input).transform(new Transformer<Integer, Integer>() { Flow.create(input).transform("publish", new Creator<Transformer<Integer, Integer>>() {
int sum = 0;
int count = 0;
@Override @Override
public scala.collection.immutable.Seq<Integer> onNext(Integer element) { public Transformer<Integer, Integer> create() throws Exception {
sum += element; return new Transformer<Integer, Integer>() {
count += 1; int sum = 0;
return Util.immutableSeq(new Integer[] { element, element }); int count = 0;
}
@Override @Override
public boolean isComplete() { public scala.collection.immutable.Seq<Integer> onNext(Integer element) {
return count == 4; sum += element;
} count += 1;
return Util.immutableSeq(new Integer[]{element, element});
}
@Override @Override
public scala.collection.immutable.Seq<Integer> onTermination(Option<Throwable> e) { public boolean isComplete() {
return Util.immutableSingletonSeq(sum); return count == 4;
} }
@Override @Override
public void cleanup() { public scala.collection.immutable.Seq<Integer> onTermination(Option<Throwable> e) {
probe2.getRef().tell("cleanup", ActorRef.noSender()); return Util.immutableSingletonSeq(sum);
}
@Override
public void cleanup() {
probe2.getRef().tell("cleanup", ActorRef.noSender());
}
};
} }
}).foreach(new Procedure<Integer>() { }).foreach(new Procedure<Integer>() {
public void apply(Integer elem) { public void apply(Integer elem) {
@ -167,34 +158,40 @@ public class FlowTest {
else else
return elem + elem; return elem + elem;
} }
}).transform(new Transformer<Integer, String>() { }).transform("publish", new Creator<Transformer<Integer, String>>() {
@Override @Override
public scala.collection.immutable.Seq<String> onNext(Integer element) { public Transformer<Integer, String> create() throws Exception {
return Util.immutableSingletonSeq(element.toString()); return new Transformer<Integer, String>() {
}
@Override @Override
public scala.collection.immutable.Seq<String> onTermination(Option<Throwable> e) { public scala.collection.immutable.Seq<String> onNext(Integer element) {
if (e.isEmpty()) return Util.immutableSingletonSeq(element.toString());
return Util.immutableSeq(new String[0]); }
else
return Util.immutableSingletonSeq(e.get().getMessage());
}
@Override @Override
public void onError(Throwable e) { public scala.collection.immutable.Seq<String> onTermination(Option<Throwable> e) {
} if (e.isEmpty()) {
return Util.immutableSeq(new String[0]);
} else {
return Util.immutableSingletonSeq(e.get().getMessage());
}
}
@Override @Override
public boolean isComplete() { public void onError(Throwable e) {
return false; }
}
@Override @Override
public void cleanup() { public boolean isComplete() {
} return false;
}
@Override
public void cleanup() {
}
};
}
}).foreach(new Procedure<String>() { }).foreach(new Procedure<String>() {
public void apply(String elem) { public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender()); probe.getRef().tell(elem, ActorRef.noSender());

View file

@ -21,18 +21,18 @@ class FlowTakeWithinSpec extends AkkaSpec {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
Flow(p).takeWithin(1.second).produceTo(c) Flow(p).takeWithin(1.second).produceTo(c)
val pSub = p.expectSubscription val pSub = p.expectSubscription()
val cSub = c.expectSubscription val cSub = c.expectSubscription()
cSub.request(100) cSub.request(100)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest()
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest val demand2 = pSub.expectRequest()
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest val demand3 = pSub.expectRequest()
val sentN = demand1 + demand2 val sentN = demand1 + demand2
(1 to sentN) foreach { n c.expectNext(n) } (1 to sentN) foreach { n c.expectNext(n) }
within(2.seconds) { within(2.seconds) {
c.expectComplete c.expectComplete()
} }
(1 to demand3) foreach { _ pSub.sendNext(input.next()) } (1 to demand3) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
@ -41,11 +41,11 @@ class FlowTakeWithinSpec extends AkkaSpec {
"deliver bufferd elements onComplete before the timeout" in { "deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
Flow(1 to 3).takeWithin(1.second).produceTo(c) Flow(1 to 3).takeWithin(1.second).produceTo(c)
val cSub = c.expectSubscription val cSub = c.expectSubscription()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
cSub.request(100) cSub.request(100)
(1 to 3) foreach { n c.expectNext(n) } (1 to 3) foreach { n c.expectNext(n) }
c.expectComplete c.expectComplete()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
} }

View file

@ -3,28 +3,22 @@
*/ */
package akka.stream package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.AkkaSpec
import akka.testkit.EventFilter
import com.typesafe.config.ConfigFactory
import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Flow
import akka.testkit.TestProbe import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.collection.immutable
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowTimerTransformerSpec extends AkkaSpec { class FlowTimerTransformerSpec extends AkkaSpec {
import system.dispatcher
implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) implicit val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
"A Flow with TimerTransformer operations" must { "A Flow with TimerTransformer operations" must {
"produce scheduled ticks as expected" in { "produce scheduled ticks as expected" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new TimerTransformer[Int, Int] { timerTransform("timer", () new TimerTransformer[Int, Int] {
schedulePeriodically("tick", 100.millis) schedulePeriodically("tick", 100.millis)
var tickCount = 0 var tickCount = 0
override def onNext(elem: Int) = List(elem) override def onNext(elem: Int) = List(elem)
@ -49,7 +43,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
"schedule ticks when last transformation step (consume)" in { "schedule ticks when last transformation step (consume)" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new TimerTransformer[Int, Int] { timerTransform("timer", () new TimerTransformer[Int, Int] {
schedulePeriodically("tick", 100.millis) schedulePeriodically("tick", 100.millis)
var tickCount = 0 var tickCount = 0
override def onNext(elem: Int) = List(elem) override def onNext(elem: Int) = List(elem)
@ -62,7 +56,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
override def isComplete: Boolean = !isTimerActive("tick") override def isComplete: Boolean = !isTimerActive("tick")
}). }).
consume() consume()
val pSub = p.expectSubscription val pSub = p.expectSubscription()
expectMsg("tick-1") expectMsg("tick-1")
expectMsg("tick-2") expectMsg("tick-2")
expectMsg("tick-3") expectMsg("tick-3")
@ -73,7 +67,7 @@ class FlowTimerTransformerSpec extends AkkaSpec {
val exception = new Exception("Expected exception to the rule") with NoStackTrace val exception = new Exception("Expected exception to the rule") with NoStackTrace
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new TimerTransformer[Int, Int] { timerTransform("timer", () new TimerTransformer[Int, Int] {
scheduleOnce("tick", 100.millis) scheduleOnce("tick", 100.millis)
def onNext(element: Int) = Nil def onNext(element: Int) = Nil

View file

@ -42,7 +42,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"produce one-to-one transformation as expected" in { "produce one-to-one transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
tot += elem tot += elem
@ -70,7 +70,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"produce one-to-several transformation as expected" in { "produce one-to-several transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
tot += elem tot += elem
@ -101,7 +101,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"produce dropping transformation as expected" in { "produce dropping transformation as expected" in {
val p = Flow(List(1, 2, 3, 4).iterator).toPublisher() val p = Flow(List(1, 2, 3, 4).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
tot += elem tot += elem
@ -129,14 +129,14 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"produce multi-step transformation as expected" in { "produce multi-step transformation as expected" in {
val p = Flow(List("a", "bc", "def").iterator).toPublisher() val p = Flow(List("a", "bc", "def").iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new TryRecoveryTransformer[String, Int] { transform("transform", () new TryRecoveryTransformer[String, Int] {
var concat = "" var concat = ""
override def onNext(element: Try[String]) = { override def onNext(element: Try[String]) = {
concat += element concat += element
List(concat.length) List(concat.length)
} }
}). }).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(length: Int) = { override def onNext(length: Int) = {
tot += length tot += length
@ -173,7 +173,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"invoke onComplete when done" in { "invoke onComplete when done" in {
val p = Flow(List("a").iterator).toPublisher() val p = Flow(List("a").iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new TryRecoveryTransformer[String, String] { transform("transform", () new TryRecoveryTransformer[String, String] {
var s = "" var s = ""
override def onNext(element: Try[String]) = { override def onNext(element: Try[String]) = {
s += element s += element
@ -193,7 +193,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"allow cancellation using isComplete" in { "allow cancellation using isComplete" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new TryRecoveryTransformer[Int, Int] { transform("transform", () new TryRecoveryTransformer[Int, Int] {
var s = "" var s = ""
override def onNext(element: Try[Int]) = { override def onNext(element: Try[Int]) = {
s += element s += element
@ -217,7 +217,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"call onComplete after isComplete signaled completion" in { "call onComplete after isComplete signaled completion" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new TryRecoveryTransformer[Int, Int] { transform("transform", () new TryRecoveryTransformer[Int, Int] {
var s = "" var s = ""
override def onNext(element: Try[Int]) = { override def onNext(element: Try[Int]) = {
s += element s += element
@ -243,7 +243,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"report error when exception is thrown" in { "report error when exception is thrown" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
if (elem == 2) throw new IllegalArgumentException("two not allowed") if (elem == 2) throw new IllegalArgumentException("two not allowed")
else List(elem, elem) else List(elem, elem)
@ -272,7 +272,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
if (elem == 2) throw new IllegalArgumentException("two not allowed") if (elem == 2) throw new IllegalArgumentException("two not allowed")
else (1 to 5).map(elem * 100 + _) else (1 to 5).map(elem * 100 + _)
}. }.
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = List(elem) override def onNext(elem: Int) = List(elem)
override def onError(e: Throwable) = () override def onError(e: Throwable) = ()
override def onTermination(e: Option[Throwable]) = e match { override def onTermination(e: Option[Throwable]) = e match {
@ -317,7 +317,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"transform errors in sequence with normal messages" in { "transform errors in sequence with normal messages" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, String] { transform("transform", () new Transformer[Int, String] {
var s = "" var s = ""
override def onNext(element: Int) = { override def onNext(element: Int) = {
s += element.toString s += element.toString
@ -350,7 +350,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"forward errors when received and thrown" in { "forward errors when received and thrown" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(in: Int) = List(in) override def onNext(in: Int) = List(in)
override def onError(e: Throwable) = throw e override def onError(e: Throwable) = throw e
}). }).
@ -369,7 +369,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"support cancel as expected" in { "support cancel as expected" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = List(elem, elem) override def onNext(elem: Int) = List(elem, elem)
override def onError(e: Throwable) = List(-1) override def onError(e: Throwable) = List(-1)
}). }).

View file

@ -3,21 +3,18 @@
*/ */
package akka.stream package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.AkkaSpec
import akka.testkit.EventFilter
import com.typesafe.config.ConfigFactory
import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Flow
import akka.testkit.TestProbe import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.testkit.{ EventFilter, TestProbe }
import com.typesafe.config.ConfigFactory
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.collection.immutable
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
import system.dispatcher
implicit val materializer = FlowMaterializer(MaterializerSettings( implicit val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2, initialInputBufferSize = 2,
maximumInputBufferSize = 2, maximumInputBufferSize = 2,
@ -29,7 +26,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"produce one-to-one transformation as expected" in { "produce one-to-one transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
tot += elem tot += elem
@ -52,7 +49,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"produce one-to-several transformation as expected" in { "produce one-to-several transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
tot += elem tot += elem
@ -78,11 +75,15 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"produce dropping transformation as expected" in { "produce dropping transformation as expected" in {
val p = Flow(List(1, 2, 3, 4).iterator).toPublisher() val p = Flow(List(1, 2, 3, 4).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
tot += elem tot += elem
if (elem % 2 == 0) Nil else List(tot) if (elem % 2 == 0) {
Nil
} else {
List(tot)
}
} }
}). }).
toPublisher() toPublisher()
@ -101,14 +102,14 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"produce multi-step transformation as expected" in { "produce multi-step transformation as expected" in {
val p = Flow(List("a", "bc", "def").iterator).toPublisher() val p = Flow(List("a", "bc", "def").iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[String, Int] { transform("transform", () new Transformer[String, Int] {
var concat = "" var concat = ""
override def onNext(elem: String) = { override def onNext(elem: String) = {
concat += elem concat += elem
List(concat.length) List(concat.length)
} }
}). }).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
override def onNext(length: Int) = { override def onNext(length: Int) = {
tot += length tot += length
@ -140,7 +141,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke onComplete when done" in { "invoke onComplete when done" in {
val p = Flow(List("a").iterator).toPublisher() val p = Flow(List("a").iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "" var s = ""
override def onNext(element: String) = { override def onNext(element: String) = {
s += element s += element
@ -161,7 +162,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
val cleanupProbe = TestProbe() val cleanupProbe = TestProbe()
val p = Flow(List("a").iterator).toPublisher() val p = Flow(List("a").iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "" var s = ""
override def onNext(element: String) = { override def onNext(element: String) = {
s += element s += element
@ -184,7 +185,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
val cleanupProbe = TestProbe() val cleanupProbe = TestProbe()
val p = Flow(List("a").iterator).toPublisher() val p = Flow(List("a").iterator).toPublisher()
Flow(p). Flow(p).
transform(new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "x" var s = "x"
override def onNext(element: String) = { override def onNext(element: String) = {
s = element s = element
@ -200,11 +201,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
val cleanupProbe = TestProbe() val cleanupProbe = TestProbe()
val p = Flow(List("a", "b", "c").iterator).toPublisher() val p = Flow(List("a", "b", "c").iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "" var s = ""
override def onNext(in: String) = { override def onNext(in: String) = {
if (in == "b") throw new IllegalArgumentException("Not b") with NoStackTrace if (in == "b") {
else { throw new IllegalArgumentException("Not b") with NoStackTrace
} else {
val out = s + in val out = s + in
s += in.toUpperCase s += in.toUpperCase
List(out) List(out)
@ -227,7 +229,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"allow cancellation using isComplete" in { "allow cancellation using isComplete" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var s = "" var s = ""
override def onNext(element: Int) = { override def onNext(element: Int) = {
s += element s += element
@ -252,7 +254,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
val cleanupProbe = TestProbe() val cleanupProbe = TestProbe()
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var s = "" var s = ""
override def onNext(element: Int) = { override def onNext(element: Int) = {
s += element s += element
@ -280,10 +282,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"report error when exception is thrown" in { "report error when exception is thrown" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
if (elem == 2) throw new IllegalArgumentException("two not allowed") if (elem == 2) {
else List(elem, elem) throw new IllegalArgumentException("two not allowed")
} else {
List(elem, elem)
}
} }
}). }).
toPublisher() toPublisher()
@ -302,7 +307,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"support cancel as expected" in { "support cancel as expected" in {
val p = Flow(List(1, 2, 3).iterator).toPublisher() val p = Flow(List(1, 2, 3).iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = List(elem, elem) override def onNext(elem: Int) = List(elem, elem)
}). }).
toPublisher() toPublisher()
@ -321,7 +326,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"support producing elements from empty inputs" in { "support producing elements from empty inputs" in {
val p = Flow(List.empty[Int].iterator).toPublisher() val p = Flow(List.empty[Int].iterator).toPublisher()
val p2 = Flow(p). val p2 = Flow(p).
transform(new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = Nil override def onNext(elem: Int) = Nil
override def onTermination(e: Option[Throwable]) = List(1, 2, 3) override def onTermination(e: Option[Throwable]) = List(1, 2, 3)
}). }).
@ -339,7 +344,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"support converting onComplete into onError" in { "support converting onComplete into onError" in {
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
Flow(List(5, 1, 2, 3)).transform(new Transformer[Int, Int] { Flow(List(5, 1, 2, 3)).transform("transform", () new Transformer[Int, Int] {
var expectedNumberOfElements: Option[Int] = None var expectedNumberOfElements: Option[Int] = None
var count = 0 var count = 0
override def onNext(elem: Int) = override def onNext(elem: Int) =
@ -354,7 +359,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
case Some(e) Nil case Some(e) Nil
case None case None
expectedNumberOfElements match { expectedNumberOfElements match {
case Some(expected) if (count != expected) case Some(expected) if count != expected
throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace throw new RuntimeException(s"Expected $expected, got $count") with NoStackTrace
case _ Nil case _ Nil
} }
@ -367,7 +372,31 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
subscriber.expectNext(1) subscriber.expectNext(1)
subscriber.expectNext(2) subscriber.expectNext(2)
subscriber.expectNext(3) subscriber.expectNext(3)
subscriber.expectError.getMessage should be("Expected 5, got 3") subscriber.expectError().getMessage should be("Expected 5, got 3")
}
"be safe to reuse" in {
val flow = Flow(1 to 3).transform("transform", ()
new Transformer[Int, Int] {
var count = 0
override def onNext(elem: Int): Seq[Int] = {
count += 1
List(count)
}
})
val s1 = StreamTestKit.SubscriberProbe[Int]()
flow.produceTo(s1)
s1.expectSubscription().request(3)
s1.expectNext(1, 2, 3)
s1.expectComplete()
val s2 = StreamTestKit.SubscriberProbe[Int]()
flow.produceTo(s2)
s2.expectSubscription().request(3)
s2.expectNext(1, 2, 3)
s2.expectComplete()
} }
} }