diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index e5efbf1b55..8f4b0050a9 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -14,7 +14,7 @@ import org.reactivestreams.api.Consumer object FlowMaterializer { /** - * Creates a FlowMaterializer which will execute every step of a transformation + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object @@ -27,6 +27,15 @@ object FlowMaterializer { def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = new ActorBasedFlowMaterializer(settings, context, namePrefix.getOrElse("flow")) + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + */ + def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = + apply(settings)(context) } /** @@ -36,7 +45,7 @@ object FlowMaterializer { * steps are split up into asynchronous regions is implementation * dependent. */ -trait FlowMaterializer { +abstract class FlowMaterializer { /** * The `namePrefix` is used as the first part of the names of the actors running @@ -71,6 +80,16 @@ trait FlowMaterializer { } +object MaterializerSettings { + private val defaultSettings = new MaterializerSettings + /** + * Java API: Default settings. + * Refine the settings using [[MaterializerSettings#withBuffer]], + * [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]] + */ + def create(): MaterializerSettings = defaultSettings +} + /** * The buffers employed by the generated Processors can be configured by * creating an appropriate instance of this class. @@ -97,5 +116,20 @@ case class MaterializerSettings( require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two") require(initialInputBufferSize <= maximumInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)") + + def withBuffer(initialInputBufferSize: Int, maximumInputBufferSize: Int): MaterializerSettings = + copy(initialInputBufferSize = initialInputBufferSize, maximumInputBufferSize = maximumInputBufferSize) + + def withFanOut(initialFanOutBufferSize: Int, maxFanOutBufferSize: Int): MaterializerSettings = + copy(initialFanOutBufferSize = initialFanOutBufferSize, maxFanOutBufferSize = maxFanOutBufferSize) + + def withSubscriptionTimeout(timeout: FiniteDuration): MaterializerSettings = + copy(upstreamSubscriptionTimeout = timeout, downstreamSubscriptionTimeout = timeout) + + def withSubscriptionTimeout(upstreamSubscriptionTimeout: FiniteDuration, + downstreamSubscriptionTimeout: FiniteDuration): MaterializerSettings = + copy(upstreamSubscriptionTimeout = upstreamSubscriptionTimeout, + downstreamSubscriptionTimeout = downstreamSubscriptionTimeout) + } diff --git a/akka-stream/src/main/scala/akka/stream/Support.scala b/akka-stream/src/main/scala/akka/stream/Support.scala index 2dd77b8852..5dfdf97be9 100644 --- a/akka-stream/src/main/scala/akka/stream/Support.scala +++ b/akka-stream/src/main/scala/akka/stream/Support.scala @@ -10,4 +10,9 @@ import scala.util.control.NoStackTrace * signal the end of stream (if the produced stream is not infinite). This is used for example in * [[akka.stream.scaladsl.Flow#apply]] (the variant which takes a closure). */ -case object Stop extends RuntimeException("Stop this flow") with NoStackTrace +case object Stop extends RuntimeException("Stop this flow") with NoStackTrace { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} diff --git a/akka-stream/src/main/scala/akka/stream/Transformer.scala b/akka-stream/src/main/scala/akka/stream/Transformer.scala new file mode 100644 index 0000000000..110a5319d0 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/Transformer.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.collection.immutable + +/** + * General interface for stream transformation. + * + * It is possible to keep state in the concrete [[Transformer]] instance with + * ordinary instance variables. The [[Transformer]] is executed by an actor and + * therefore you don not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * @see [[akka.stream.scaladsl.Flow#transform]] + * @see [[akka.stream.javadsl.Flow#transform]] + */ +abstract class Transformer[-T, +U] { + /** + * Invoked for each element to produce a (possibly empty) sequence of + * output elements. + */ + def onNext(element: T): immutable.Seq[U] + + /** + * Invoked after handing off the elements produced from one input element to the + * downstream consumers to determine whether to end stream processing at this point; + * in that case the upstream subscription is canceled. + */ + def isComplete: Boolean = false + + /** + * Invoked before signaling normal completion to the downstream consumers + * to produce a (possibly empty) sequence of elements in response to the + * end-of-stream event. + */ + def onComplete(): immutable.Seq[U] = Nil + + /** + * Invoked when failure is signaled from upstream. + */ + def onError(cause: Throwable): Unit = () + + /** + * Invoked after normal completion or error. + */ + def cleanup(): Unit = () + + /** + * Name of this transformation step. Used as part of the actor name. + * Facilitates debugging and logging. + */ + def name: String = "transform" +} + +/** + * General interface for stream transformation. + * @see [[akka.stream.scaladsl.Flow#transformRecover]] + * @see [[akka.stream.javadsl.Flow#transformRecover]] + * @see [[Transformer]] + */ +abstract class RecoveryTransformer[-T, +U] extends Transformer[T, U] { + /** + * Invoked when failure is signaled from upstream to emit an additional + * sequence of elements before the stream ends. + */ + def onErrorRecover(cause: Throwable): immutable.Seq[U] + + /** + * Name of this transformation step. Used as part of the actor name. + * Facilitates debugging and logging. + */ + override def name: String = "transformRecover" +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala index 555dd96a28..16466c33f9 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala @@ -7,7 +7,8 @@ import akka.stream.scaladsl.Duct import akka.stream.scaladsl.Flow /** - * Additional [[Flow]] and [[Duct]] operators. + * Additional [[akka.stream.scaladsl.Flow]] and [[akka.stream.scaladsl.Duct]] + * operators. */ object Implicits { diff --git a/akka-stream/src/main/scala/akka/stream/extra/Log.scala b/akka-stream/src/main/scala/akka/stream/extra/Log.scala index ec660fb664..f726cccb30 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Log.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Log.scala @@ -4,14 +4,14 @@ package akka.stream.extra import scala.collection.immutable -import akka.stream.scaladsl.Transformer +import akka.stream.Transformer import akka.stream.impl.ActorBasedFlowMaterializer import akka.actor.ActorContext import akka.event.LoggingAdapter import akka.event.Logging /** - * Scala API: Mix in TransformerLogging into your [[akka.stream.scaladsl.Transformer]] + * Scala API: Mix in TransformerLogging into your [[akka.stream.Transformer]] * to obtain a reference to a logger, which is available under the name [[#log]]. */ trait TransformerLogging { this: Transformer[_, _] ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 47fb0cbf81..69c51fcb6c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -9,8 +9,8 @@ import org.reactivestreams.api.{ Consumer, Processor, Producer } import org.reactivestreams.spi.Subscriber import akka.actor.ActorRefFactory import akka.stream.{ MaterializerSettings, FlowMaterializer } -import akka.stream.scaladsl.Transformer -import akka.stream.scaladsl.RecoveryTransformer +import akka.stream.Transformer +import akka.stream.RecoveryTransformer import scala.util.Try import scala.concurrent.Future import scala.util.Success diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala index cbbb12c418..1253f04281 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala @@ -11,8 +11,8 @@ import org.reactivestreams.spi.{ Subscriber, Subscription } import Ast.{ AstNode, Recover, Transform } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala } import akka.stream.MaterializerSettings -import akka.stream.scaladsl.Transformer -import akka.stream.scaladsl.RecoveryTransformer +import akka.stream.Transformer +import akka.stream.RecoveryTransformer /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 81ab08f56f..3d5f3e0cb9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -13,8 +13,8 @@ import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow import scala.util.Success import scala.util.Failure -import akka.stream.scaladsl.Transformer -import akka.stream.scaladsl.RecoveryTransformer +import akka.stream.Transformer +import akka.stream.RecoveryTransformer import org.reactivestreams.api.Consumer import akka.stream.scaladsl.Duct diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 86526c8120..9a99129c1d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -7,8 +7,8 @@ import scala.collection.immutable import scala.util.{ Failure, Success } import akka.actor.Props import akka.stream.MaterializerSettings -import akka.stream.scaladsl.RecoveryTransformer -import akka.stream.scaladsl.Transformer +import akka.stream.RecoveryTransformer +import akka.stream.Transformer import scala.util.control.NonFatal /** diff --git a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala index 089b306fd6..edc2862289 100644 --- a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala +++ b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala @@ -15,6 +15,7 @@ import akka.stream.impl.{ ActorPublisher, ExposedPublisher, ActorProcessor } import akka.stream.MaterializerSettings import akka.io.IO import java.net.URLEncoder +import akka.japi.Util object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { @@ -41,17 +42,132 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { } } - case class Connect(remoteAddress: InetSocketAddress, + /** + * The Connect message is sent to the StreamTcp manager actor, which is obtained via + * `IO(StreamTcp)`. The manager replies with a [[StreamTcp.OutgoingTcpConnection]] + * message. + * + * @param remoteAddress is the address to connect to + * @param localAddress optionally specifies a specific address to bind to + * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. + * @param timeout is the desired connection timeout, `null` means "no timeout" + */ + case class Connect(settings: MaterializerSettings, + remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil, - timeout: Option[FiniteDuration] = None, - settings: MaterializerSettings) + timeout: Option[FiniteDuration] = None) { - case class Bind(localAddress: InetSocketAddress, + /** + * Java API + */ + def withLocalAddress(localAddress: InetSocketAddress): Connect = + copy(localAddress = Option(localAddress)) + + /** + * Java API + */ + def withSocketOptions(options: java.lang.Iterable[SocketOption]): Connect = + copy(options = Util.immutableSeq(options)) + + /** + * Java API + */ + def withTimeout(timeout: FiniteDuration): Connect = + copy(timeout = Option(timeout)) + } + + /** + * The Bind message is send to the StreamTcp manager actor, which is obtained via + * `IO(StreamTcp)`, in order to bind to a listening socket. The manager + * replies with a [[StreamTcp.TcpServerBinding]]. If the local port is set to 0 in + * the Bind message, then the [[StreamTcp.TcpServerBinding]] message should be inspected to find + * the actual port which was bound to. + * + * @param localAddress The socket address to bind to; use port zero for + * automatic assignment (i.e. an ephemeral port) + * + * @param backlog This specifies the number of unaccepted connections the O/S + * kernel will hold for this port before refusing connections. + * + * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. + */ + case class Bind(settings: MaterializerSettings, + localAddress: InetSocketAddress, backlog: Int = 100, - options: immutable.Traversable[SocketOption] = Nil, - settings: MaterializerSettings) + options: immutable.Traversable[SocketOption] = Nil) { + /** + * Java API + */ + def withBacklog(backlog: Int): Bind = copy(backlog = backlog) + + /** + * Java API + */ + def withSocketOptions(options: java.lang.Iterable[SocketOption]): Bind = + copy(options = Util.immutableSeq(options)) + + } + +} + +/** + * Java API: Factory methods for the messages of `StreamTcp`. + */ +object StreamTcpMessage { + /** + * Java API: The Connect message is sent to the StreamTcp manager actor, which is obtained via + * `StreamTcp.get(system).manager()`. The manager replies with a [[StreamTcp.OutgoingTcpConnection]] + * message. + * + * @param remoteAddress is the address to connect to + * @param localAddress optionally specifies a specific address to bind to + * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. + * @param timeout is the desired connection timeout, `null` means "no timeout" + */ + def connect( + settings: MaterializerSettings, + remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: java.lang.Iterable[SocketOption], + timeout: FiniteDuration): StreamTcp.Connect = + StreamTcp.Connect(settings, remoteAddress, Option(localAddress), Util.immutableSeq(options), Option(timeout)) + + /** + * Java API: Message to Connect to the given `remoteAddress` without binding to a local address and without + * specifying options. + */ + def connect(settings: MaterializerSettings, remoteAddress: InetSocketAddress): StreamTcp.Connect = + StreamTcp.Connect(settings, remoteAddress) + + /** + * Java API: The Bind message is send to the StreamTcp manager actor, which is obtained via + * `StreamTcp.get(system).manager()`, in order to bind to a listening socket. The manager + * replies with a [[StreamTcp.TcpServerBinding]]. If the local port is set to 0 in + * the Bind message, then the [[StreamTcp.TcpServerBinding]] message should be inspected to find + * the actual port which was bound to. + * + * @param localAddress The socket address to bind to; use port zero for + * automatic assignment (i.e. an ephemeral port) + * + * @param backlog This specifies the number of unaccepted connections the O/S + * kernel will hold for this port before refusing connections. + * + * @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options. + */ + def bind(settings: MaterializerSettings, + localAddress: InetSocketAddress, + backlog: Int, + options: java.lang.Iterable[SocketOption]): StreamTcp.Bind = + StreamTcp.Bind(settings, localAddress, backlog, Util.immutableSeq(options)) + + /** + * Java API: Message to open a listening socket without specifying options. + */ + def bind(settings: MaterializerSettings, + localAddress: InetSocketAddress): StreamTcp.Bind = + StreamTcp.Bind(settings, localAddress) } /** @@ -81,14 +197,14 @@ private[akka] class StreamTcpManager extends Actor { } def receive: Receive = { - case StreamTcp.Connect(remoteAddress, localAddress, options, timeout, settings) ⇒ + case StreamTcp.Connect(settings, remoteAddress, localAddress, options, timeout) ⇒ val processorActor = context.actorOf(TcpStreamActor.outboundProps( Tcp.Connect(remoteAddress, localAddress, options, timeout, pullMode = true), requester = sender(), settings), name = encName("client", remoteAddress)) processorActor ! ExposedProcessor(new ActorProcessor[ByteString, ByteString](processorActor)) - case StreamTcp.Bind(localAddress, backlog, options, settings) ⇒ + case StreamTcp.Bind(settings, localAddress, backlog, options) ⇒ val publisherActor = context.actorOf(TcpListenStreamActor.props( Tcp.Bind(context.system.deadLetters, localAddress, backlog, options, pullMode = true), requester = sender(), diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala new file mode 100644 index 0000000000..29d32d5bab --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -0,0 +1,394 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl + +import java.util.concurrent.Callable +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.concurrent.Future +import scala.util.Failure +import scala.util.Success +import org.reactivestreams.api.Producer +import akka.japi.Function +import akka.japi.Function2 +import akka.japi.Procedure +import akka.japi.Util.immutableSeq +import akka.stream.FlowMaterializer +import akka.stream.scaladsl.{ Flow ⇒ SFlow } +import akka.stream.Transformer +import akka.stream.RecoveryTransformer +import org.reactivestreams.api.Consumer + +/** + * Java API + */ +object Flow { + + /** + * Construct a transformation of the given producer. The transformation steps + * are executed by a series of [[org.reactivestreams.api.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + */ + def create[T](producer: Producer[T]): Flow[T] = new FlowAdapter(SFlow.apply(producer)) + + /** + * Start a new flow from the given Iterator. The produced stream of elements + * will continue until the iterator runs empty or fails during evaluation of + * the next() method. Elements are pulled out of the iterator + * in accordance with the demand coming from the downstream transformation + * steps. + */ + def create[T](iterator: java.util.Iterator[T]): Flow[T] = + new FlowAdapter(SFlow.apply(iterator.asScala)) + + /** + * Start a new flow from the given Iterable. This is like starting from an + * Iterator, but every Consumer directly attached to the Producer of this + * stream will see an individual flow of elements (always starting from the + * beginning) regardless of when they subscribed. + */ + def create[T](iterable: java.lang.Iterable[T]): Flow[T] = { + val iterAdapter: immutable.Iterable[T] = new immutable.Iterable[T] { + override def iterator: Iterator[T] = iterable.iterator().asScala + } + new FlowAdapter(SFlow.apply(iterAdapter)) + } + + /** + * Define the sequence of elements to be produced by the given Callable. + * The stream ends normally when evaluation of the Callable results in + * a [[akka.stream.Stop]] exception being thrown; it ends exceptionally + * when any other exception is thrown. + */ + def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() ⇒ block.call())) + +} + +/** + * Java API: The Flow DSL allows the formulation of stream transformations based on some + * input. The starting point can be a collection, an iterator, a block of code + * which is evaluated repeatedly or a [[org.reactivestreams.api.Producer]]. + * + * See Reactive Streams for details. + * + * Each DSL element produces a new Flow that can be further transformed, building + * up a description of the complete transformation pipeline. In order to execute + * this pipeline the Flow must be materialized by calling the [[#toFuture]], [[#consume]], + * [[#onComplete]], or [[#toProducer]] methods on it. + * + * It should be noted that the streams modeled by this library are “hot”, + * meaning that they asynchronously flow through a series of processors without + * detailed control by the user. In particular it is not predictable how many + * elements a given transformation step might buffer before handing elements + * downstream, which means that transformation functions may be invoked more + * often than for corresponding transformations on strict collections like + * `List`. *An important consequence* is that elements that were produced + * into a stream may be discarded by later processors, e.g. when using the + * [[#take]] combinator. + * + * By default every operation is executed within its own [[akka.actor.Actor]] + * to enable full pipelining of the chained set of computations. This behavior + * is determined by the [[akka.stream.FlowMaterializer]] which is required + * by those methods that materialize the Flow into a series of + * [[org.reactivestreams.api.Processor]] instances. The returned reactive stream + * is fully started and active. + */ +abstract class Flow[T] { + + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. + */ + def map[U](f: Function[T, U]): Flow[U] + + /** + * Only pass on those elements that satisfy the given predicate. + */ + def filter(p: Predicate[T]): Flow[T] + + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. + * Non-matching elements are filtered out. + * + * Use [[akka.japi.pf.PFBuilder]] to construct the `PartialFunction`. + */ + def collect[U](pf: PartialFunction[T, U]): Flow[U] + + /** + * Invoke the given procedure for each received element and produce a Unit value + * upon reaching the normal end of the stream. Please note that also in this case + * the flow needs to be materialized (e.g. using [[#consume]]) to initiate its + * execution. + */ + def foreach(c: Procedure[T]): Flow[Unit] + + /** + * Invoke the given function for every received element, giving it its previous + * output (or the given “zero” value) and the element as input. The returned stream + * will receive the return value of the final function evaluation when the input + * stream ends. + */ + def fold[U](zero: U, f: Function2[U, T, U]): Flow[U] + + /** + * Discard the given number of elements at the beginning of the stream. + */ + def drop(n: Int): Flow[T] + + /** + * Terminate processing (and cancel the upstream producer) after the given + * number of elements. Due to input buffering some elements may have been + * requested from upstream producers that will then not be processed downstream + * of this step. + */ + def take(n: Int): Flow[T] + + /** + * Chunk up this stream into groups of the given size, with the last group + * possibly smaller than requested due to end-of-stream. + */ + def grouped(n: Int): Flow[java.util.List[T]] + + /** + * Transform each input element into a sequence of output elements that is + * then flattened into the output stream. + */ + def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U] + + /** + * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] + * function is invoked and expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * consumers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream consumers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you don not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + */ + def transform[U](transformer: Transformer[T, U]): Flow[U] + + /** + * This transformation stage works exactly like [[#transform]] with the + * change that failure signaled from upstream will invoke + * [[akka.stream.RecoveryTransformer#onError]], which can emit an additional sequence of + * elements before the stream ends. + * + * After normal completion or error the [[akka.stream.RecoveryTransformer#cleanup]] function is called. + */ + def transformRecover[U](transformer: RecoveryTransformer[T, U]): Flow[U] + + /** + * This operation demultiplexes the incoming stream into separate output + * streams, one for each element key. The key is computed for each element + * using the given function. When a new key is encountered for the first time + * it is emitted to the downstream consumer together with a fresh + * producer that will eventually produce all the elements of the substream + * for that key. Not consuming the elements from the created streams will + * stop this processor from processing more elements, therefore you must take + * care to unblock (or cancel) all of the produced streams even if you want + * to consume only one of them. + */ + def groupBy[K](f: Function[T, K]): Flow[Pair[K, Producer[T]]] + + /** + * This operation applies the given predicate to all incoming elements and + * emits them to a stream of output streams, always beginning a new one with + * the current element if the given predicate returns true for it. This means + * that for the following series of predicate values, three substreams will + * be produced with lengths 1, 2, and 3: + * + * {{{ + * false, // element goes into first substream + * true, false, // elements go into second substream + * true, false, false // elements go into third substream + * }}} + */ + def splitWhen(p: Predicate[T]): Flow[Producer[T]] + + /** + * Merge this stream with the one emitted by the given producer, taking + * elements as they arrive from either side (picking randomly when both + * have elements ready). + */ + def merge[U >: T](other: Producer[U]): Flow[U] + + /** + * Zip this stream together with the one emitted by the given producer. + * This transformation finishes when either input stream reaches its end, + * cancelling the subscription to the other one. + */ + def zip[U](other: Producer[U]): Flow[Pair[T, U]] + + /** + * Concatenate the given other stream to this stream so that the first element + * emitted by the given producer is emitted after the last element of this + * stream. + */ + def concat[U >: T](next: Producer[U]): Flow[U] + + /** + * Fan-out the stream to another consumer. Each element is produced to + * the `other` consumer as well as to downstream consumers. It will + * not shutdown until the subscriptions for `other` and at least + * one downstream consumer have been established. + */ + def tee(other: Consumer[_ >: T]): Flow[T] + + /** + * Returns a [[scala.concurrent.Future]] that will be fulfilled with the first + * thing that is signaled to this stream, which can be either an element (after + * which the upstream subscription is canceled), an error condition (putting + * the Future into the corresponding failed state) or the end-of-stream + * (failing the Future with a NoSuchElementException). *This operation + * materializes the flow and initiates its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def toFuture(materializer: FlowMaterializer): Future[T] + + /** + * Attaches a consumer to this stream which will just discard all received + * elements. *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def consume(materializer: FlowMaterializer): Unit + + /** + * When this flow is completed, either through an error or normal + * completion, call the [[OnCompleteCallback#onComplete]] method. + * + * *This operation materializes the flow and initiates its execution.* + */ + def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit + + /** + * Materialize this flow and return the downstream-most + * [[org.reactivestreams.api.Producer]] interface. The stream will not have + * any consumers attached at this point, which means that after prefetching + * elements to fill the internal buffers it will assert back-pressure until + * a consumer connects and creates demand for elements to be emitted. + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def toProducer(materializer: FlowMaterializer): Producer[T] + +} + +/** + * @see [[Flow#onComplete]] + */ +trait OnCompleteCallback { + /** + * The parameter `e` will be `null` when the stream terminated + * normally, otherwise it will be the exception that caused + * the abnormal termination. + */ + def onComplete(e: Throwable) +} + +/** + * Java API: Represents a tuple of two elements. + */ +case class Pair[A, B](a: A, b: B) // FIXME move this to akka.japi.Pair in akka-actor + +/** + * Java API: Defines a criteria and determines whether the parameter meets this criteria. + */ +trait Predicate[T] { + // FIXME move this to akka.japi.Predicate in akka-actor + def test(param: T): Boolean +} + +/** + * INTERNAL API + */ +private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { + override def map[U](f: Function[T, U]): Flow[U] = new FlowAdapter(delegate.map(f.apply)) + + override def filter(p: Predicate[T]): Flow[T] = new FlowAdapter(delegate.filter(p.test)) + + override def collect[U](pf: PartialFunction[T, U]): Flow[U] = new FlowAdapter(delegate.collect(pf)) + + override def foreach(c: Procedure[T]): Flow[Unit] = new FlowAdapter(delegate.foreach(c.apply)) + + override def fold[U](zero: U, f: Function2[U, T, U]): Flow[U] = + new FlowAdapter(delegate.fold(zero) { case (a, b) ⇒ f.apply(a, b) }) + + override def drop(n: Int): Flow[T] = new FlowAdapter(delegate.drop(n)) + + override def take(n: Int): Flow[T] = new FlowAdapter(delegate.take(n)) + + override def grouped(n: Int): Flow[java.util.List[T]] = + new FlowAdapter(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step + + override def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U] = + new FlowAdapter(delegate.mapConcat(elem ⇒ immutableSeq(f.apply(elem)))) + + override def transform[U](transformer: Transformer[T, U]): Flow[U] = + new FlowAdapter(delegate.transform(new Transformer[T, U] { + override def onNext(in: T) = transformer.onNext(in) + override def isComplete = transformer.isComplete + override def onComplete() = transformer.onComplete() + override def onError(cause: Throwable) = transformer.onError(cause) + override def cleanup() = transformer.cleanup() + })) + + override def transformRecover[U](transformer: RecoveryTransformer[T, U]): Flow[U] = + new FlowAdapter(delegate.transform(new RecoveryTransformer[T, U] { + override def onNext(in: T) = transformer.onNext(in) + override def isComplete = transformer.isComplete + override def onComplete() = transformer.onComplete() + override def onError(cause: Throwable) = transformer.onError(cause) + override def onErrorRecover(cause: Throwable) = transformer.onErrorRecover(cause) + override def cleanup() = transformer.cleanup() + })) + + override def groupBy[K](f: Function[T, K]): Flow[Pair[K, Producer[T]]] = + new FlowAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step + + override def splitWhen(p: Predicate[T]): Flow[Producer[T]] = + new FlowAdapter(delegate.splitWhen(p.test)) + + override def merge[U >: T](other: Producer[U]): Flow[U] = + new FlowAdapter(delegate.merge(other)) + + override def zip[U](other: Producer[U]): Flow[Pair[T, U]] = + new FlowAdapter(delegate.zip(other).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step + + override def concat[U >: T](next: Producer[U]): Flow[U] = + new FlowAdapter(delegate.concat(next)) + + override def tee(other: Consumer[_ >: T]): Flow[T] = + new FlowAdapter(delegate.tee(other)) + + override def toFuture(materializer: FlowMaterializer): Future[T] = + delegate.toFuture(materializer) + + override def consume(materializer: FlowMaterializer): Unit = + delegate.consume(materializer) + + override def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit = + delegate.onComplete(materializer) { + case Success(_) ⇒ callback.onComplete(null) + case Failure(e) ⇒ callback.onComplete(e) + } + + override def toProducer(materializer: FlowMaterializer): Producer[T] = + delegate.toProducer(materializer) + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 30b76c1e46..9d4988e773 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -3,13 +3,15 @@ */ package akka.stream.scaladsl -import scala.collection.immutable -import akka.stream.impl.DuctImpl -import org.reactivestreams.api.Consumer -import akka.stream.FlowMaterializer import scala.annotation.unchecked.uncheckedVariance -import org.reactivestreams.api.Producer +import scala.collection.immutable import scala.util.Try +import org.reactivestreams.api.Consumer +import org.reactivestreams.api.Producer +import akka.stream.FlowMaterializer +import akka.stream.RecoveryTransformer +import akka.stream.Transformer +import akka.stream.impl.DuctImpl object Duct { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 37fe2f98f9..ac3bbc24fb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -7,14 +7,18 @@ import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.util.Try -import scala.util.control.NoStackTrace import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer import akka.stream.FlowMaterializer +import akka.stream.RecoveryTransformer +import akka.stream.Transformer import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } -import akka.stream.impl.FlowImpl import akka.stream.impl.Ast.FutureProducerNode +import akka.stream.impl.FlowImpl +/** + * Scala API + */ object Flow { /** * Construct a transformation of the given producer. The transformation steps @@ -60,7 +64,7 @@ object Flow { } /** - * The Flow DSL allows the formulation of stream transformations based on some + * Scala API: The Flow DSL allows the formulation of stream transformations based on some * input. The starting point can be a collection, an iterator, a block of code * which is evaluated repeatedly or a [[org.reactivestreams.api.Producer]]. * @@ -68,8 +72,8 @@ object Flow { * * Each DSL element produces a new Flow that can be further transformed, building * up a description of the complete transformation pipeline. In order to execute - * this pipeline the Flow must be materialized by calling the [[#toFuture]], [[#consume]] - * or [[#toProducer]] methods on it. + * this pipeline the Flow must be materialized by calling the [[#toFuture]], [[#consume]], + * [[#onComplete]], or [[#toProducer]] methods on it. * * It should be noted that the streams modeled by this library are “hot”, * meaning that they asynchronously flow through a series of processors without @@ -150,22 +154,22 @@ trait Flow[+T] { def mapConcat[U](f: T ⇒ immutable.Seq[U]): Flow[U] /** - * Generic transformation of a stream: for each element the [[Transformer#onNext]] + * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] * function is invoked, expecting a (possibly empty) sequence of output elements * to be produced. * After handing off the elements produced from one input element to the downstream - * consumers, the [[Transformer#isComplete]] predicate determines whether to end + * consumers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end * stream processing at this point; in that case the upstream subscription is * canceled. Before signaling normal completion to the downstream consumers, - * the [[Transformer#onComplete]] function is invoked to produce a (possibly empty) + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) * sequence of elements in response to the end-of-stream event. * - * [[Transformer#onError]] is called when failure is signaled from upstream. + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. * - * After normal completion or error the [[Transformer#cleanup]] function is called. + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. * - * It is possible to keep state in the concrete [[Transformer]] instance with - * ordinary instance variables. The [[Transformer]] is executed by an actor and + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and * therefore you don not have to add any additional thread safety or memory * visibility constructs to access the state from the callback methods. */ @@ -174,10 +178,10 @@ trait Flow[+T] { /** * This transformation stage works exactly like [[#transform]] with the * change that failure signaled from upstream will invoke - * [[RecoveryTransformer#onErrorRecover]], which can emit an additional sequence of + * [[akka.stream.RecoveryTransformer#onErrorRecover]], which can emit an additional sequence of * elements before the stream ends. * - * [[Transformer#onError]] is not called when failure is signaled from upstream. + * [[akka.stream.Transformer#onError]] is not called when failure is signaled from upstream. */ def transformRecover[U](recoveryTransformer: RecoveryTransformer[T, U]): Flow[U] @@ -293,70 +297,3 @@ trait Flow[+T] { } -/** - * General interface for stream transformation. - * - * It is possible to keep state in the concrete [[Transformer]] instance with - * ordinary instance variables. The [[Transformer]] is executed by an actor and - * therefore you don not have to add any additional thread safety or memory - * visibility constructs to access the state from the callback methods. - * - * @see [[Flow#transform]] - */ -trait Transformer[-T, +U] { - /** - * Invoked for each element to produce a (possibly empty) sequence of - * output elements. - */ - def onNext(element: T): immutable.Seq[U] - - /** - * Invoked after handing off the elements produced from one input element to the - * downstream consumers to determine whether to end stream processing at this point; - * in that case the upstream subscription is canceled. - */ - def isComplete: Boolean = false - - /** - * Invoked before signaling normal completion to the downstream consumers - * to produce a (possibly empty) sequence of elements in response to the - * end-of-stream event. - */ - def onComplete(): immutable.Seq[U] = Nil - - /** - * Invoked when failure is signaled from upstream. - */ - def onError(cause: Throwable): Unit = () - - /** - * Invoked after normal completion or error. - */ - def cleanup(): Unit = () - - /** - * Name of this transformation step. Used as part of the actor name. - * Facilitates debugging and logging. - */ - def name: String = "transform" -} - -/** - * General interface for stream transformation. - * @see [[Flow#transformRecover]] - * @see [[Transformer]] - */ -trait RecoveryTransformer[-T, +U] extends Transformer[T, U] { - /** - * Invoked when failure is signaled from upstream to emit an additional - * sequence of elements before the stream ends. - */ - def onErrorRecover(cause: Throwable): immutable.Seq[U] - - /** - * Name of this transformation step. Used as part of the actor name. - * Facilitates debugging and logging. - */ - override def name: String = "transformRecover" -} - diff --git a/akka-stream/src/test/java/akka/stream/javadsl/AkkaJUnitActorSystemResource.java b/akka-stream/src/test/java/akka/stream/javadsl/AkkaJUnitActorSystemResource.java new file mode 100644 index 0000000000..009a9f0f87 --- /dev/null +++ b/akka-stream/src/test/java/akka/stream/javadsl/AkkaJUnitActorSystemResource.java @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.javadsl; + +import org.junit.rules.ExternalResource; + +import akka.actor.ActorSystem; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; + +import com.typesafe.config.Config; + +// FIXME remove this copy, and use akka.testkit.AkkaJUnitActorSystemResource when +// akka-stream-experimental becomes a normal build project + +/** + * This is a resource for creating an actor system before test start and shut it + * down afterwards. + * + * To use it on a class level add this to your test class: + * + * @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = + * new AkkaJUnitActorSystemResource(name, config); + * + * private final ActorSystem system = + * actorSystemResource.getSystem(); + * + * + * To use it on a per test level add this to your test class: + * + * @Rule public AkkaJUnitActorSystemResource actorSystemResource = new + * AkkaJUnitActorSystemResource(name, config); + * + * private final ActorSystem system = actorSystemResource.getSystem(); + */ + +public class AkkaJUnitActorSystemResource extends ExternalResource { + private ActorSystem system = null; + private final String name; + private final Config config; + + private ActorSystem createSystem(String name, Config config) { + try { + if (config == null) + return ActorSystem.create(name); + else + return ActorSystem.create(name, config); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + public AkkaJUnitActorSystemResource(String name, Config config) { + this.name = name; + this.config = config; + system = createSystem(name, config); + } + + public AkkaJUnitActorSystemResource(String name) { + this(name, AkkaSpec.testConf()); + } + + @Override + protected void before() throws Throwable { + // Sometimes the ExternalResource seems to be reused, and + // we don't run the constructor again, so if that's the case + // then create the system here + if (system == null) { + system = createSystem(name, config); + } + } + + @Override + protected void after() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + public ActorSystem getSystem() { + return system; + } +} diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java new file mode 100644 index 0000000000..95fd32080f --- /dev/null +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -0,0 +1,370 @@ +package akka.stream.javadsl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.junit.ClassRule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +import org.reactivestreams.api.Producer; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.japi.Function; +import akka.japi.Function2; +import akka.japi.Procedure; +import akka.japi.Util; +import akka.stream.FlowMaterializer; +import akka.stream.MaterializerSettings; +import akka.stream.RecoveryTransformer; +import akka.stream.Transformer; +import akka.stream.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; + +public class FlowTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("StashJavaAPI", + AkkaSpec.testConf()); + + final ActorSystem system = actorSystemResource.getSystem(); + + final MaterializerSettings settings = MaterializerSettings.create(); + final FlowMaterializer materializer = FlowMaterializer.create(settings, system); + + @Test + public void mustBeAbleToUseSimpleOperators() { + final JavaTestKit probe = new JavaTestKit(system); + final String[] lookup = { "a", "b", "c", "d", "e", "f" }; + final java.util.Iterator input = Arrays.asList(0, 1, 2, 3, 4, 5).iterator(); + Flow.create(input).drop(2).take(3).map(new Function() { + public String apply(Integer elem) { + return lookup[elem]; + } + }).filter(new Predicate() { + public boolean test(String elem) { + return !elem.equals("c"); + } + }).grouped(2).mapConcat(new Function, java.util.List>() { + public java.util.List apply(java.util.List elem) { + return elem; + } + }).fold("", new Function2() { + public String apply(String acc, String elem) { + return acc + elem; + } + }).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + probe.expectMsgEquals("de"); + + } + + @Test + public void mustBeAbleToUseTransform() { + final JavaTestKit probe = new JavaTestKit(system); + final JavaTestKit probe2 = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); + // duplicate each element, stop after 4 elements, and emit sum to the end + Flow.create(input).transform(new Transformer() { + int sum = 0; + int count = 0; + + @Override + public scala.collection.immutable.Seq onNext(Integer element) { + sum += element; + count += 1; + return Util.immutableSeq(new Integer[] { element, element }); + } + + @Override + public boolean isComplete() { + return count == 4; + } + + @Override + public scala.collection.immutable.Seq onComplete() { + return Util.immutableSingletonSeq(sum); + } + + @Override + public void cleanup() { + probe2.getRef().tell("cleanup", ActorRef.noSender()); + } + }).foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + probe.expectMsgEquals(0); + probe.expectMsgEquals(0); + probe.expectMsgEquals(1); + probe.expectMsgEquals(1); + probe.expectMsgEquals(2); + probe.expectMsgEquals(2); + probe.expectMsgEquals(3); + probe.expectMsgEquals(3); + probe.expectMsgEquals(6); + probe2.expectMsgEquals("cleanup"); + } + + @Test + public void mustBeAbleToUseTransformRecover() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); + Flow.create(input).map(new Function() { + public Integer apply(Integer elem) { + if (elem == 4) + throw new IllegalArgumentException("4 not allowed"); + else + return elem + elem; + } + }).transformRecover(new RecoveryTransformer() { + + @Override + public scala.collection.immutable.Seq onNext(Integer element) { + return Util.immutableSingletonSeq(element.toString()); + } + + @Override + public scala.collection.immutable.Seq onErrorRecover(Throwable e) { + return Util.immutableSingletonSeq(e.getMessage()); + } + + @Override + public boolean isComplete() { + return false; + } + + @Override + public scala.collection.immutable.Seq onComplete() { + return Util.immutableSeq(new String[0]); + } + + @Override + public void cleanup() { + } + + }).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + probe.expectMsgEquals("0"); + probe.expectMsgEquals("2"); + probe.expectMsgEquals("4"); + probe.expectMsgEquals("6"); + probe.expectMsgEquals("4 not allowed"); + } + + @Test + public void mustBeAbleToUseGroupBy() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee"); + Flow.create(input).groupBy(new Function() { + public String apply(String elem) { + return elem.substring(0, 1); + } + }).foreach(new Procedure>>() { + public void apply(final Pair> pair) { + Flow.create(pair.b()).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(new Pair(pair.a(), elem), ActorRef.noSender()); + } + }).consume(materializer); + } + }).consume(materializer); + + Map> grouped = new HashMap>(); + for (Object o : probe.receiveN(5)) { + @SuppressWarnings("unchecked") + Pair p = (Pair) o; + List g = grouped.get(p.a()); + if (g == null) + g = new ArrayList(); + g.add(p.b()); + grouped.put(p.a(), g); + } + + assertEquals(Arrays.asList("Aaa", "Abb"), grouped.get("A")); + + } + + @Test + public void mustBeAbleToUseSplitWhen() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList("A", "B", "C", "\n", "D", "\n", "E", "F"); + Flow.create(input).splitWhen(new Predicate() { + public boolean test(String elem) { + return elem.equals("\n"); + } + }).foreach(new Procedure>() { + public void apply(Producer subProducer) { + Flow.create(subProducer).filter(new Predicate() { + public boolean test(String elem) { + return !elem.equals("\n"); + } + }).grouped(10).foreach(new Procedure>() { + public void apply(List chunk) { + probe.getRef().tell(chunk, ActorRef.noSender()); + } + }).consume(materializer); + } + }).consume(materializer); + + for (Object o : probe.receiveN(3)) { + @SuppressWarnings("unchecked") + List chunk = (List) o; + if (chunk.get(0).equals("A")) + assertEquals(Arrays.asList("A", "B", "C"), chunk); + else if (chunk.get(0).equals("D")) + assertEquals(Arrays.asList("D"), chunk); + else if (chunk.get(0).equals("E")) + assertEquals(Arrays.asList("E", "F"), chunk); + else + assertEquals("[A, B, C] or [D] or [E, F]", chunk); + } + + } + + @Test + public void mustBeAbleToUseMerge() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input1 = Arrays.asList("A", "B", "C"); + final java.lang.Iterable input2 = Arrays.asList("D", "E", "F"); + Flow.create(input1).merge(Flow.create(input2).toProducer(materializer)).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + Set output = new HashSet(Arrays.asList(probe.receiveN(6))); + assertEquals(new HashSet(Arrays.asList("A", "B", "C", "D", "E", "F")), output); + } + + @Test + public void mustBeAbleToUseZip() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input1 = Arrays.asList("A", "B", "C"); + final java.lang.Iterable input2 = Arrays.asList(1, 2, 3); + Flow.create(input1).zip(Flow.create(input2).toProducer(materializer)) + .foreach(new Procedure>() { + public void apply(Pair elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + List output = Arrays.asList(probe.receiveN(3)); + @SuppressWarnings("unchecked") + List> expected = Arrays.asList(new Pair("A", 1), new Pair( + "B", 2), new Pair("C", 3)); + assertEquals(expected, output); + } + + @Test + public void mustBeAbleToUseConcat() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input1 = Arrays.asList("A", "B", "C"); + final java.lang.Iterable input2 = Arrays.asList("D", "E", "F"); + Flow.create(input1).concat(Flow.create(input2).toProducer(materializer)).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + List output = Arrays.asList(probe.receiveN(6)); + assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); + } + + @Test + public void mustBeAbleToUseCallableInput() { + final JavaTestKit probe = new JavaTestKit(system); + final Callable input = new Callable() { + int countdown = 5; + + @Override + public Integer call() { + if (countdown == 0) + throw akka.stream.Stop.getInstance(); + else { + countdown -= 1; + return countdown; + } + } + }; + Flow.create(input).foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + List output = Arrays.asList(probe.receiveN(5)); + assertEquals(Arrays.asList(4, 3, 2, 1, 0), output); + probe.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS)); + } + + @Test + public void mustBeAbleToUseOnCompleteSuccess() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList("A", "B", "C"); + Flow.create(input).onComplete(materializer, new OnCompleteCallback() { + @Override + public void onComplete(Throwable e) { + if (e == null) + probe.getRef().tell("done", ActorRef.noSender()); + else + probe.getRef().tell(e, ActorRef.noSender()); + } + }); + + probe.expectMsgEquals("done"); + } + + @Test + public void mustBeAbleToUseOnCompleteError() { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList("A", "B", "C"); + Flow.create(input).map(new Function() { + public String apply(String arg0) throws Exception { + throw new RuntimeException("simulated err"); + } + }).onComplete(materializer, new OnCompleteCallback() { + @Override + public void onComplete(Throwable e) { + if (e == null) + probe.getRef().tell("done", ActorRef.noSender()); + else + probe.getRef().tell(e, ActorRef.noSender()); + } + }); + + probe.expectMsgEquals("simulated err"); + } + + @Test + public void mustBeAbleToUseToFuture() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList("A", "B", "C"); + Future future = Flow.create(input).toFuture(materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("A", result); + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index 5be4a1d556..9d4fe7ffe6 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -11,7 +11,6 @@ import akka.testkit.EventFilter import scala.util.Failure import scala.util.control.NoStackTrace import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.RecoveryTransformer import scala.util.Try import scala.util.Success diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 317fec372d..7f69d2e461 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -10,7 +10,6 @@ import akka.testkit.EventFilter import com.typesafe.config.ConfigFactory import akka.stream.scaladsl.Flow import akka.testkit.TestProbe -import akka.stream.scaladsl.Transformer import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index e97e972375..20473e3bc1 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -14,7 +14,6 @@ import akka.stream.impl.Ast import akka.testkit.{ TestEvent, EventFilter } import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Transformer import akka.stream.testkit.AkkaSpec import java.util.concurrent.atomic.AtomicInteger diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala index 8ce8a4ff23..7b795c5133 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala @@ -12,7 +12,6 @@ import akka.actor.ActorRef import scala.collection.immutable.TreeSet import scala.util.control.NonFatal import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.scaladsl.Transformer import akka.stream.impl.FlowNameCounter @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) diff --git a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala index 76b5981256..08b3e950e7 100644 --- a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -179,7 +179,7 @@ class TcpFlowSpec extends AkkaSpec { def connect(server: Server): (Processor[ByteString, ByteString], ServerConnection) = { val tcpProbe = TestProbe() - tcpProbe.send(IO(StreamTcp), StreamTcp.Connect(server.address, settings = settings)) + tcpProbe.send(IO(StreamTcp), StreamTcp.Connect(settings, server.address)) val client = server.waitAccept() val outgoingConnection = tcpProbe.expectMsgType[StreamTcp.OutgoingTcpConnection] @@ -188,13 +188,13 @@ class TcpFlowSpec extends AkkaSpec { def connect(serverAddress: InetSocketAddress): StreamTcp.OutgoingTcpConnection = { val connectProbe = TestProbe() - connectProbe.send(IO(StreamTcp), StreamTcp.Connect(serverAddress, settings = settings)) + connectProbe.send(IO(StreamTcp), StreamTcp.Connect(settings, serverAddress)) connectProbe.expectMsgType[StreamTcp.OutgoingTcpConnection] } def bind(serverAddress: InetSocketAddress = temporaryServerAddress): StreamTcp.TcpServerBinding = { val bindProbe = TestProbe() - bindProbe.send(IO(StreamTcp), StreamTcp.Bind(serverAddress, settings = settings)) + bindProbe.send(IO(StreamTcp), StreamTcp.Bind(settings, serverAddress)) bindProbe.expectMsgType[StreamTcp.TcpServerBinding] }