+str #15034 Add Flow javadsl
* move Transformer to akka.stream package * and Java API for StreamIO
This commit is contained in:
parent
91abadf78c
commit
aedc57eb66
20 changed files with 1129 additions and 115 deletions
|
|
@ -14,7 +14,7 @@ import org.reactivestreams.api.Consumer
|
|||
object FlowMaterializer {
|
||||
|
||||
/**
|
||||
* Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
|
|
@ -27,6 +27,15 @@ object FlowMaterializer {
|
|||
def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer =
|
||||
new ActorBasedFlowMaterializer(settings, context, namePrefix.getOrElse("flow"))
|
||||
|
||||
/**
|
||||
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
* to another actor if the factory is an ActorContext.
|
||||
*/
|
||||
def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer =
|
||||
apply(settings)(context)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -36,7 +45,7 @@ object FlowMaterializer {
|
|||
* steps are split up into asynchronous regions is implementation
|
||||
* dependent.
|
||||
*/
|
||||
trait FlowMaterializer {
|
||||
abstract class FlowMaterializer {
|
||||
|
||||
/**
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
|
|
@ -71,6 +80,16 @@ trait FlowMaterializer {
|
|||
|
||||
}
|
||||
|
||||
object MaterializerSettings {
|
||||
private val defaultSettings = new MaterializerSettings
|
||||
/**
|
||||
* Java API: Default settings.
|
||||
* Refine the settings using [[MaterializerSettings#withBuffer]],
|
||||
* [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]]
|
||||
*/
|
||||
def create(): MaterializerSettings = defaultSettings
|
||||
}
|
||||
|
||||
/**
|
||||
* The buffers employed by the generated Processors can be configured by
|
||||
* creating an appropriate instance of this class.
|
||||
|
|
@ -97,5 +116,20 @@ case class MaterializerSettings(
|
|||
require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two")
|
||||
require(initialInputBufferSize <= maximumInputBufferSize,
|
||||
s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)")
|
||||
|
||||
def withBuffer(initialInputBufferSize: Int, maximumInputBufferSize: Int): MaterializerSettings =
|
||||
copy(initialInputBufferSize = initialInputBufferSize, maximumInputBufferSize = maximumInputBufferSize)
|
||||
|
||||
def withFanOut(initialFanOutBufferSize: Int, maxFanOutBufferSize: Int): MaterializerSettings =
|
||||
copy(initialFanOutBufferSize = initialFanOutBufferSize, maxFanOutBufferSize = maxFanOutBufferSize)
|
||||
|
||||
def withSubscriptionTimeout(timeout: FiniteDuration): MaterializerSettings =
|
||||
copy(upstreamSubscriptionTimeout = timeout, downstreamSubscriptionTimeout = timeout)
|
||||
|
||||
def withSubscriptionTimeout(upstreamSubscriptionTimeout: FiniteDuration,
|
||||
downstreamSubscriptionTimeout: FiniteDuration): MaterializerSettings =
|
||||
copy(upstreamSubscriptionTimeout = upstreamSubscriptionTimeout,
|
||||
downstreamSubscriptionTimeout = downstreamSubscriptionTimeout)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,4 +10,9 @@ import scala.util.control.NoStackTrace
|
|||
* signal the end of stream (if the produced stream is not infinite). This is used for example in
|
||||
* [[akka.stream.scaladsl.Flow#apply]] (the variant which takes a closure).
|
||||
*/
|
||||
case object Stop extends RuntimeException("Stop this flow") with NoStackTrace
|
||||
case object Stop extends RuntimeException("Stop this flow") with NoStackTrace {
|
||||
/**
|
||||
* Java API: get the singleton instance
|
||||
*/
|
||||
def getInstance = this
|
||||
}
|
||||
|
|
|
|||
75
akka-stream/src/main/scala/akka/stream/Transformer.scala
Normal file
75
akka-stream/src/main/scala/akka/stream/Transformer.scala
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
/**
|
||||
* General interface for stream transformation.
|
||||
*
|
||||
* It is possible to keep state in the concrete [[Transformer]] instance with
|
||||
* ordinary instance variables. The [[Transformer]] is executed by an actor and
|
||||
* therefore you don not have to add any additional thread safety or memory
|
||||
* visibility constructs to access the state from the callback methods.
|
||||
*
|
||||
* @see [[akka.stream.scaladsl.Flow#transform]]
|
||||
* @see [[akka.stream.javadsl.Flow#transform]]
|
||||
*/
|
||||
abstract class Transformer[-T, +U] {
|
||||
/**
|
||||
* Invoked for each element to produce a (possibly empty) sequence of
|
||||
* output elements.
|
||||
*/
|
||||
def onNext(element: T): immutable.Seq[U]
|
||||
|
||||
/**
|
||||
* Invoked after handing off the elements produced from one input element to the
|
||||
* downstream consumers to determine whether to end stream processing at this point;
|
||||
* in that case the upstream subscription is canceled.
|
||||
*/
|
||||
def isComplete: Boolean = false
|
||||
|
||||
/**
|
||||
* Invoked before signaling normal completion to the downstream consumers
|
||||
* to produce a (possibly empty) sequence of elements in response to the
|
||||
* end-of-stream event.
|
||||
*/
|
||||
def onComplete(): immutable.Seq[U] = Nil
|
||||
|
||||
/**
|
||||
* Invoked when failure is signaled from upstream.
|
||||
*/
|
||||
def onError(cause: Throwable): Unit = ()
|
||||
|
||||
/**
|
||||
* Invoked after normal completion or error.
|
||||
*/
|
||||
def cleanup(): Unit = ()
|
||||
|
||||
/**
|
||||
* Name of this transformation step. Used as part of the actor name.
|
||||
* Facilitates debugging and logging.
|
||||
*/
|
||||
def name: String = "transform"
|
||||
}
|
||||
|
||||
/**
|
||||
* General interface for stream transformation.
|
||||
* @see [[akka.stream.scaladsl.Flow#transformRecover]]
|
||||
* @see [[akka.stream.javadsl.Flow#transformRecover]]
|
||||
* @see [[Transformer]]
|
||||
*/
|
||||
abstract class RecoveryTransformer[-T, +U] extends Transformer[T, U] {
|
||||
/**
|
||||
* Invoked when failure is signaled from upstream to emit an additional
|
||||
* sequence of elements before the stream ends.
|
||||
*/
|
||||
def onErrorRecover(cause: Throwable): immutable.Seq[U]
|
||||
|
||||
/**
|
||||
* Name of this transformation step. Used as part of the actor name.
|
||||
* Facilitates debugging and logging.
|
||||
*/
|
||||
override def name: String = "transformRecover"
|
||||
}
|
||||
|
|
@ -7,7 +7,8 @@ import akka.stream.scaladsl.Duct
|
|||
import akka.stream.scaladsl.Flow
|
||||
|
||||
/**
|
||||
* Additional [[Flow]] and [[Duct]] operators.
|
||||
* Additional [[akka.stream.scaladsl.Flow]] and [[akka.stream.scaladsl.Duct]]
|
||||
* operators.
|
||||
*/
|
||||
object Implicits {
|
||||
|
||||
|
|
|
|||
|
|
@ -4,14 +4,14 @@
|
|||
package akka.stream.extra
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.impl.ActorBasedFlowMaterializer
|
||||
import akka.actor.ActorContext
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.event.Logging
|
||||
|
||||
/**
|
||||
* Scala API: Mix in TransformerLogging into your [[akka.stream.scaladsl.Transformer]]
|
||||
* Scala API: Mix in TransformerLogging into your [[akka.stream.Transformer]]
|
||||
* to obtain a reference to a logger, which is available under the name [[#log]].
|
||||
*/
|
||||
trait TransformerLogging { this: Transformer[_, _] ⇒
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ import org.reactivestreams.api.{ Consumer, Processor, Producer }
|
|||
import org.reactivestreams.spi.Subscriber
|
||||
import akka.actor.ActorRefFactory
|
||||
import akka.stream.{ MaterializerSettings, FlowMaterializer }
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import akka.stream.scaladsl.RecoveryTransformer
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.RecoveryTransformer
|
||||
import scala.util.Try
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Success
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ import org.reactivestreams.spi.{ Subscriber, Subscription }
|
|||
import Ast.{ AstNode, Recover, Transform }
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala }
|
||||
import akka.stream.MaterializerSettings
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import akka.stream.scaladsl.RecoveryTransformer
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.RecoveryTransformer
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ import akka.stream.FlowMaterializer
|
|||
import akka.stream.scaladsl.Flow
|
||||
import scala.util.Success
|
||||
import scala.util.Failure
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import akka.stream.scaladsl.RecoveryTransformer
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.RecoveryTransformer
|
||||
import org.reactivestreams.api.Consumer
|
||||
import akka.stream.scaladsl.Duct
|
||||
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ import scala.collection.immutable
|
|||
import scala.util.{ Failure, Success }
|
||||
import akka.actor.Props
|
||||
import akka.stream.MaterializerSettings
|
||||
import akka.stream.scaladsl.RecoveryTransformer
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import akka.stream.RecoveryTransformer
|
||||
import akka.stream.Transformer
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import akka.stream.impl.{ ActorPublisher, ExposedPublisher, ActorProcessor }
|
|||
import akka.stream.MaterializerSettings
|
||||
import akka.io.IO
|
||||
import java.net.URLEncoder
|
||||
import akka.japi.Util
|
||||
|
||||
object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
|
||||
|
||||
|
|
@ -41,17 +42,132 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
|
|||
}
|
||||
}
|
||||
|
||||
case class Connect(remoteAddress: InetSocketAddress,
|
||||
/**
|
||||
* The Connect message is sent to the StreamTcp manager actor, which is obtained via
|
||||
* `IO(StreamTcp)`. The manager replies with a [[StreamTcp.OutgoingTcpConnection]]
|
||||
* message.
|
||||
*
|
||||
* @param remoteAddress is the address to connect to
|
||||
* @param localAddress optionally specifies a specific address to bind to
|
||||
* @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options.
|
||||
* @param timeout is the desired connection timeout, `null` means "no timeout"
|
||||
*/
|
||||
case class Connect(settings: MaterializerSettings,
|
||||
remoteAddress: InetSocketAddress,
|
||||
localAddress: Option[InetSocketAddress] = None,
|
||||
options: immutable.Traversable[SocketOption] = Nil,
|
||||
timeout: Option[FiniteDuration] = None,
|
||||
settings: MaterializerSettings)
|
||||
timeout: Option[FiniteDuration] = None) {
|
||||
|
||||
case class Bind(localAddress: InetSocketAddress,
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def withLocalAddress(localAddress: InetSocketAddress): Connect =
|
||||
copy(localAddress = Option(localAddress))
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def withSocketOptions(options: java.lang.Iterable[SocketOption]): Connect =
|
||||
copy(options = Util.immutableSeq(options))
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def withTimeout(timeout: FiniteDuration): Connect =
|
||||
copy(timeout = Option(timeout))
|
||||
}
|
||||
|
||||
/**
|
||||
* The Bind message is send to the StreamTcp manager actor, which is obtained via
|
||||
* `IO(StreamTcp)`, in order to bind to a listening socket. The manager
|
||||
* replies with a [[StreamTcp.TcpServerBinding]]. If the local port is set to 0 in
|
||||
* the Bind message, then the [[StreamTcp.TcpServerBinding]] message should be inspected to find
|
||||
* the actual port which was bound to.
|
||||
*
|
||||
* @param localAddress The socket address to bind to; use port zero for
|
||||
* automatic assignment (i.e. an ephemeral port)
|
||||
*
|
||||
* @param backlog This specifies the number of unaccepted connections the O/S
|
||||
* kernel will hold for this port before refusing connections.
|
||||
*
|
||||
* @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options.
|
||||
*/
|
||||
case class Bind(settings: MaterializerSettings,
|
||||
localAddress: InetSocketAddress,
|
||||
backlog: Int = 100,
|
||||
options: immutable.Traversable[SocketOption] = Nil,
|
||||
settings: MaterializerSettings)
|
||||
options: immutable.Traversable[SocketOption] = Nil) {
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def withBacklog(backlog: Int): Bind = copy(backlog = backlog)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def withSocketOptions(options: java.lang.Iterable[SocketOption]): Bind =
|
||||
copy(options = Util.immutableSeq(options))
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Factory methods for the messages of `StreamTcp`.
|
||||
*/
|
||||
object StreamTcpMessage {
|
||||
/**
|
||||
* Java API: The Connect message is sent to the StreamTcp manager actor, which is obtained via
|
||||
* `StreamTcp.get(system).manager()`. The manager replies with a [[StreamTcp.OutgoingTcpConnection]]
|
||||
* message.
|
||||
*
|
||||
* @param remoteAddress is the address to connect to
|
||||
* @param localAddress optionally specifies a specific address to bind to
|
||||
* @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options.
|
||||
* @param timeout is the desired connection timeout, `null` means "no timeout"
|
||||
*/
|
||||
def connect(
|
||||
settings: MaterializerSettings,
|
||||
remoteAddress: InetSocketAddress,
|
||||
localAddress: InetSocketAddress,
|
||||
options: java.lang.Iterable[SocketOption],
|
||||
timeout: FiniteDuration): StreamTcp.Connect =
|
||||
StreamTcp.Connect(settings, remoteAddress, Option(localAddress), Util.immutableSeq(options), Option(timeout))
|
||||
|
||||
/**
|
||||
* Java API: Message to Connect to the given `remoteAddress` without binding to a local address and without
|
||||
* specifying options.
|
||||
*/
|
||||
def connect(settings: MaterializerSettings, remoteAddress: InetSocketAddress): StreamTcp.Connect =
|
||||
StreamTcp.Connect(settings, remoteAddress)
|
||||
|
||||
/**
|
||||
* Java API: The Bind message is send to the StreamTcp manager actor, which is obtained via
|
||||
* `StreamTcp.get(system).manager()`, in order to bind to a listening socket. The manager
|
||||
* replies with a [[StreamTcp.TcpServerBinding]]. If the local port is set to 0 in
|
||||
* the Bind message, then the [[StreamTcp.TcpServerBinding]] message should be inspected to find
|
||||
* the actual port which was bound to.
|
||||
*
|
||||
* @param localAddress The socket address to bind to; use port zero for
|
||||
* automatic assignment (i.e. an ephemeral port)
|
||||
*
|
||||
* @param backlog This specifies the number of unaccepted connections the O/S
|
||||
* kernel will hold for this port before refusing connections.
|
||||
*
|
||||
* @param options Please refer to [[akka.io.TcpSO]] for a list of all supported options.
|
||||
*/
|
||||
def bind(settings: MaterializerSettings,
|
||||
localAddress: InetSocketAddress,
|
||||
backlog: Int,
|
||||
options: java.lang.Iterable[SocketOption]): StreamTcp.Bind =
|
||||
StreamTcp.Bind(settings, localAddress, backlog, Util.immutableSeq(options))
|
||||
|
||||
/**
|
||||
* Java API: Message to open a listening socket without specifying options.
|
||||
*/
|
||||
def bind(settings: MaterializerSettings,
|
||||
localAddress: InetSocketAddress): StreamTcp.Bind =
|
||||
StreamTcp.Bind(settings, localAddress)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -81,14 +197,14 @@ private[akka] class StreamTcpManager extends Actor {
|
|||
}
|
||||
|
||||
def receive: Receive = {
|
||||
case StreamTcp.Connect(remoteAddress, localAddress, options, timeout, settings) ⇒
|
||||
case StreamTcp.Connect(settings, remoteAddress, localAddress, options, timeout) ⇒
|
||||
val processorActor = context.actorOf(TcpStreamActor.outboundProps(
|
||||
Tcp.Connect(remoteAddress, localAddress, options, timeout, pullMode = true),
|
||||
requester = sender(),
|
||||
settings), name = encName("client", remoteAddress))
|
||||
processorActor ! ExposedProcessor(new ActorProcessor[ByteString, ByteString](processorActor))
|
||||
|
||||
case StreamTcp.Bind(localAddress, backlog, options, settings) ⇒
|
||||
case StreamTcp.Bind(settings, localAddress, backlog, options) ⇒
|
||||
val publisherActor = context.actorOf(TcpListenStreamActor.props(
|
||||
Tcp.Bind(context.system.deadLetters, localAddress, backlog, options, pullMode = true),
|
||||
requester = sender(),
|
||||
|
|
|
|||
394
akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
Normal file
394
akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
Normal file
|
|
@ -0,0 +1,394 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.javadsl
|
||||
|
||||
import java.util.concurrent.Callable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import org.reactivestreams.api.Producer
|
||||
import akka.japi.Function
|
||||
import akka.japi.Function2
|
||||
import akka.japi.Procedure
|
||||
import akka.japi.Util.immutableSeq
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.scaladsl.{ Flow ⇒ SFlow }
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.RecoveryTransformer
|
||||
import org.reactivestreams.api.Consumer
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
object Flow {
|
||||
|
||||
/**
|
||||
* Construct a transformation of the given producer. The transformation steps
|
||||
* are executed by a series of [[org.reactivestreams.api.Processor]] instances
|
||||
* that mediate the flow of elements downstream and the propagation of
|
||||
* back-pressure upstream.
|
||||
*/
|
||||
def create[T](producer: Producer[T]): Flow[T] = new FlowAdapter(SFlow.apply(producer))
|
||||
|
||||
/**
|
||||
* Start a new flow from the given Iterator. The produced stream of elements
|
||||
* will continue until the iterator runs empty or fails during evaluation of
|
||||
* the <code>next()</code> method. Elements are pulled out of the iterator
|
||||
* in accordance with the demand coming from the downstream transformation
|
||||
* steps.
|
||||
*/
|
||||
def create[T](iterator: java.util.Iterator[T]): Flow[T] =
|
||||
new FlowAdapter(SFlow.apply(iterator.asScala))
|
||||
|
||||
/**
|
||||
* Start a new flow from the given Iterable. This is like starting from an
|
||||
* Iterator, but every Consumer directly attached to the Producer of this
|
||||
* stream will see an individual flow of elements (always starting from the
|
||||
* beginning) regardless of when they subscribed.
|
||||
*/
|
||||
def create[T](iterable: java.lang.Iterable[T]): Flow[T] = {
|
||||
val iterAdapter: immutable.Iterable[T] = new immutable.Iterable[T] {
|
||||
override def iterator: Iterator[T] = iterable.iterator().asScala
|
||||
}
|
||||
new FlowAdapter(SFlow.apply(iterAdapter))
|
||||
}
|
||||
|
||||
/**
|
||||
* Define the sequence of elements to be produced by the given Callable.
|
||||
* The stream ends normally when evaluation of the Callable results in
|
||||
* a [[akka.stream.Stop]] exception being thrown; it ends exceptionally
|
||||
* when any other exception is thrown.
|
||||
*/
|
||||
def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() ⇒ block.call()))
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: The Flow DSL allows the formulation of stream transformations based on some
|
||||
* input. The starting point can be a collection, an iterator, a block of code
|
||||
* which is evaluated repeatedly or a [[org.reactivestreams.api.Producer]].
|
||||
*
|
||||
* See <a href="https://github.com/reactive-streams/reactive-streams/">Reactive Streams</a> for details.
|
||||
*
|
||||
* Each DSL element produces a new Flow that can be further transformed, building
|
||||
* up a description of the complete transformation pipeline. In order to execute
|
||||
* this pipeline the Flow must be materialized by calling the [[#toFuture]], [[#consume]],
|
||||
* [[#onComplete]], or [[#toProducer]] methods on it.
|
||||
*
|
||||
* It should be noted that the streams modeled by this library are “hot”,
|
||||
* meaning that they asynchronously flow through a series of processors without
|
||||
* detailed control by the user. In particular it is not predictable how many
|
||||
* elements a given transformation step might buffer before handing elements
|
||||
* downstream, which means that transformation functions may be invoked more
|
||||
* often than for corresponding transformations on strict collections like
|
||||
* `List`. *An important consequence* is that elements that were produced
|
||||
* into a stream may be discarded by later processors, e.g. when using the
|
||||
* [[#take]] combinator.
|
||||
*
|
||||
* By default every operation is executed within its own [[akka.actor.Actor]]
|
||||
* to enable full pipelining of the chained set of computations. This behavior
|
||||
* is determined by the [[akka.stream.FlowMaterializer]] which is required
|
||||
* by those methods that materialize the Flow into a series of
|
||||
* [[org.reactivestreams.api.Processor]] instances. The returned reactive stream
|
||||
* is fully started and active.
|
||||
*/
|
||||
abstract class Flow[T] {
|
||||
|
||||
/**
|
||||
* Transform this stream by applying the given function to each of the elements
|
||||
* as they pass through this processing step.
|
||||
*/
|
||||
def map[U](f: Function[T, U]): Flow[U]
|
||||
|
||||
/**
|
||||
* Only pass on those elements that satisfy the given predicate.
|
||||
*/
|
||||
def filter(p: Predicate[T]): Flow[T]
|
||||
|
||||
/**
|
||||
* Transform this stream by applying the given partial function to each of the elements
|
||||
* on which the function is defined as they pass through this processing step.
|
||||
* Non-matching elements are filtered out.
|
||||
*
|
||||
* Use [[akka.japi.pf.PFBuilder]] to construct the `PartialFunction`.
|
||||
*/
|
||||
def collect[U](pf: PartialFunction[T, U]): Flow[U]
|
||||
|
||||
/**
|
||||
* Invoke the given procedure for each received element and produce a Unit value
|
||||
* upon reaching the normal end of the stream. Please note that also in this case
|
||||
* the flow needs to be materialized (e.g. using [[#consume]]) to initiate its
|
||||
* execution.
|
||||
*/
|
||||
def foreach(c: Procedure[T]): Flow[Unit]
|
||||
|
||||
/**
|
||||
* Invoke the given function for every received element, giving it its previous
|
||||
* output (or the given “zero” value) and the element as input. The returned stream
|
||||
* will receive the return value of the final function evaluation when the input
|
||||
* stream ends.
|
||||
*/
|
||||
def fold[U](zero: U, f: Function2[U, T, U]): Flow[U]
|
||||
|
||||
/**
|
||||
* Discard the given number of elements at the beginning of the stream.
|
||||
*/
|
||||
def drop(n: Int): Flow[T]
|
||||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream producer) after the given
|
||||
* number of elements. Due to input buffering some elements may have been
|
||||
* requested from upstream producers that will then not be processed downstream
|
||||
* of this step.
|
||||
*/
|
||||
def take(n: Int): Flow[T]
|
||||
|
||||
/**
|
||||
* Chunk up this stream into groups of the given size, with the last group
|
||||
* possibly smaller than requested due to end-of-stream.
|
||||
*/
|
||||
def grouped(n: Int): Flow[java.util.List[T]]
|
||||
|
||||
/**
|
||||
* Transform each input element into a sequence of output elements that is
|
||||
* then flattened into the output stream.
|
||||
*/
|
||||
def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U]
|
||||
|
||||
/**
|
||||
* Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]]
|
||||
* function is invoked and expecting a (possibly empty) sequence of output elements
|
||||
* to be produced.
|
||||
* After handing off the elements produced from one input element to the downstream
|
||||
* consumers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
|
||||
* stream processing at this point; in that case the upstream subscription is
|
||||
* canceled. Before signaling normal completion to the downstream consumers,
|
||||
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
|
||||
* sequence of elements in response to the end-of-stream event.
|
||||
*
|
||||
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
|
||||
*
|
||||
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
|
||||
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
|
||||
* therefore you don not have to add any additional thread safety or memory
|
||||
* visibility constructs to access the state from the callback methods.
|
||||
*/
|
||||
def transform[U](transformer: Transformer[T, U]): Flow[U]
|
||||
|
||||
/**
|
||||
* This transformation stage works exactly like [[#transform]] with the
|
||||
* change that failure signaled from upstream will invoke
|
||||
* [[akka.stream.RecoveryTransformer#onError]], which can emit an additional sequence of
|
||||
* elements before the stream ends.
|
||||
*
|
||||
* After normal completion or error the [[akka.stream.RecoveryTransformer#cleanup]] function is called.
|
||||
*/
|
||||
def transformRecover[U](transformer: RecoveryTransformer[T, U]): Flow[U]
|
||||
|
||||
/**
|
||||
* This operation demultiplexes the incoming stream into separate output
|
||||
* streams, one for each element key. The key is computed for each element
|
||||
* using the given function. When a new key is encountered for the first time
|
||||
* it is emitted to the downstream consumer together with a fresh
|
||||
* producer that will eventually produce all the elements of the substream
|
||||
* for that key. Not consuming the elements from the created streams will
|
||||
* stop this processor from processing more elements, therefore you must take
|
||||
* care to unblock (or cancel) all of the produced streams even if you want
|
||||
* to consume only one of them.
|
||||
*/
|
||||
def groupBy[K](f: Function[T, K]): Flow[Pair[K, Producer[T]]]
|
||||
|
||||
/**
|
||||
* This operation applies the given predicate to all incoming elements and
|
||||
* emits them to a stream of output streams, always beginning a new one with
|
||||
* the current element if the given predicate returns true for it. This means
|
||||
* that for the following series of predicate values, three substreams will
|
||||
* be produced with lengths 1, 2, and 3:
|
||||
*
|
||||
* {{{
|
||||
* false, // element goes into first substream
|
||||
* true, false, // elements go into second substream
|
||||
* true, false, false // elements go into third substream
|
||||
* }}}
|
||||
*/
|
||||
def splitWhen(p: Predicate[T]): Flow[Producer[T]]
|
||||
|
||||
/**
|
||||
* Merge this stream with the one emitted by the given producer, taking
|
||||
* elements as they arrive from either side (picking randomly when both
|
||||
* have elements ready).
|
||||
*/
|
||||
def merge[U >: T](other: Producer[U]): Flow[U]
|
||||
|
||||
/**
|
||||
* Zip this stream together with the one emitted by the given producer.
|
||||
* This transformation finishes when either input stream reaches its end,
|
||||
* cancelling the subscription to the other one.
|
||||
*/
|
||||
def zip[U](other: Producer[U]): Flow[Pair[T, U]]
|
||||
|
||||
/**
|
||||
* Concatenate the given other stream to this stream so that the first element
|
||||
* emitted by the given producer is emitted after the last element of this
|
||||
* stream.
|
||||
*/
|
||||
def concat[U >: T](next: Producer[U]): Flow[U]
|
||||
|
||||
/**
|
||||
* Fan-out the stream to another consumer. Each element is produced to
|
||||
* the `other` consumer as well as to downstream consumers. It will
|
||||
* not shutdown until the subscriptions for `other` and at least
|
||||
* one downstream consumer have been established.
|
||||
*/
|
||||
def tee(other: Consumer[_ >: T]): Flow[T]
|
||||
|
||||
/**
|
||||
* Returns a [[scala.concurrent.Future]] that will be fulfilled with the first
|
||||
* thing that is signaled to this stream, which can be either an element (after
|
||||
* which the upstream subscription is canceled), an error condition (putting
|
||||
* the Future into the corresponding failed state) or the end-of-stream
|
||||
* (failing the Future with a NoSuchElementException). *This operation
|
||||
* materializes the flow and initiates its execution.*
|
||||
*
|
||||
* The given FlowMaterializer decides how the flow’s logical structure is
|
||||
* broken down into individual processing steps.
|
||||
*/
|
||||
def toFuture(materializer: FlowMaterializer): Future[T]
|
||||
|
||||
/**
|
||||
* Attaches a consumer to this stream which will just discard all received
|
||||
* elements. *This will materialize the flow and initiate its execution.*
|
||||
*
|
||||
* The given FlowMaterializer decides how the flow’s logical structure is
|
||||
* broken down into individual processing steps.
|
||||
*/
|
||||
def consume(materializer: FlowMaterializer): Unit
|
||||
|
||||
/**
|
||||
* When this flow is completed, either through an error or normal
|
||||
* completion, call the [[OnCompleteCallback#onComplete]] method.
|
||||
*
|
||||
* *This operation materializes the flow and initiates its execution.*
|
||||
*/
|
||||
def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit
|
||||
|
||||
/**
|
||||
* Materialize this flow and return the downstream-most
|
||||
* [[org.reactivestreams.api.Producer]] interface. The stream will not have
|
||||
* any consumers attached at this point, which means that after prefetching
|
||||
* elements to fill the internal buffers it will assert back-pressure until
|
||||
* a consumer connects and creates demand for elements to be emitted.
|
||||
*
|
||||
* The given FlowMaterializer decides how the flow’s logical structure is
|
||||
* broken down into individual processing steps.
|
||||
*/
|
||||
def toProducer(materializer: FlowMaterializer): Producer[T]
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @see [[Flow#onComplete]]
|
||||
*/
|
||||
trait OnCompleteCallback {
|
||||
/**
|
||||
* The parameter `e` will be `null` when the stream terminated
|
||||
* normally, otherwise it will be the exception that caused
|
||||
* the abnormal termination.
|
||||
*/
|
||||
def onComplete(e: Throwable)
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Represents a tuple of two elements.
|
||||
*/
|
||||
case class Pair[A, B](a: A, b: B) // FIXME move this to akka.japi.Pair in akka-actor
|
||||
|
||||
/**
|
||||
* Java API: Defines a criteria and determines whether the parameter meets this criteria.
|
||||
*/
|
||||
trait Predicate[T] {
|
||||
// FIXME move this to akka.japi.Predicate in akka-actor
|
||||
def test(param: T): Boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] {
|
||||
override def map[U](f: Function[T, U]): Flow[U] = new FlowAdapter(delegate.map(f.apply))
|
||||
|
||||
override def filter(p: Predicate[T]): Flow[T] = new FlowAdapter(delegate.filter(p.test))
|
||||
|
||||
override def collect[U](pf: PartialFunction[T, U]): Flow[U] = new FlowAdapter(delegate.collect(pf))
|
||||
|
||||
override def foreach(c: Procedure[T]): Flow[Unit] = new FlowAdapter(delegate.foreach(c.apply))
|
||||
|
||||
override def fold[U](zero: U, f: Function2[U, T, U]): Flow[U] =
|
||||
new FlowAdapter(delegate.fold(zero) { case (a, b) ⇒ f.apply(a, b) })
|
||||
|
||||
override def drop(n: Int): Flow[T] = new FlowAdapter(delegate.drop(n))
|
||||
|
||||
override def take(n: Int): Flow[T] = new FlowAdapter(delegate.take(n))
|
||||
|
||||
override def grouped(n: Int): Flow[java.util.List[T]] =
|
||||
new FlowAdapter(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step
|
||||
|
||||
override def mapConcat[U](f: Function[T, java.util.List[U]]): Flow[U] =
|
||||
new FlowAdapter(delegate.mapConcat(elem ⇒ immutableSeq(f.apply(elem))))
|
||||
|
||||
override def transform[U](transformer: Transformer[T, U]): Flow[U] =
|
||||
new FlowAdapter(delegate.transform(new Transformer[T, U] {
|
||||
override def onNext(in: T) = transformer.onNext(in)
|
||||
override def isComplete = transformer.isComplete
|
||||
override def onComplete() = transformer.onComplete()
|
||||
override def onError(cause: Throwable) = transformer.onError(cause)
|
||||
override def cleanup() = transformer.cleanup()
|
||||
}))
|
||||
|
||||
override def transformRecover[U](transformer: RecoveryTransformer[T, U]): Flow[U] =
|
||||
new FlowAdapter(delegate.transform(new RecoveryTransformer[T, U] {
|
||||
override def onNext(in: T) = transformer.onNext(in)
|
||||
override def isComplete = transformer.isComplete
|
||||
override def onComplete() = transformer.onComplete()
|
||||
override def onError(cause: Throwable) = transformer.onError(cause)
|
||||
override def onErrorRecover(cause: Throwable) = transformer.onErrorRecover(cause)
|
||||
override def cleanup() = transformer.cleanup()
|
||||
}))
|
||||
|
||||
override def groupBy[K](f: Function[T, K]): Flow[Pair[K, Producer[T]]] =
|
||||
new FlowAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step
|
||||
|
||||
override def splitWhen(p: Predicate[T]): Flow[Producer[T]] =
|
||||
new FlowAdapter(delegate.splitWhen(p.test))
|
||||
|
||||
override def merge[U >: T](other: Producer[U]): Flow[U] =
|
||||
new FlowAdapter(delegate.merge(other))
|
||||
|
||||
override def zip[U](other: Producer[U]): Flow[Pair[T, U]] =
|
||||
new FlowAdapter(delegate.zip(other).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step
|
||||
|
||||
override def concat[U >: T](next: Producer[U]): Flow[U] =
|
||||
new FlowAdapter(delegate.concat(next))
|
||||
|
||||
override def tee(other: Consumer[_ >: T]): Flow[T] =
|
||||
new FlowAdapter(delegate.tee(other))
|
||||
|
||||
override def toFuture(materializer: FlowMaterializer): Future[T] =
|
||||
delegate.toFuture(materializer)
|
||||
|
||||
override def consume(materializer: FlowMaterializer): Unit =
|
||||
delegate.consume(materializer)
|
||||
|
||||
override def onComplete(materializer: FlowMaterializer)(callback: OnCompleteCallback): Unit =
|
||||
delegate.onComplete(materializer) {
|
||||
case Success(_) ⇒ callback.onComplete(null)
|
||||
case Failure(e) ⇒ callback.onComplete(e)
|
||||
}
|
||||
|
||||
override def toProducer(materializer: FlowMaterializer): Producer[T] =
|
||||
delegate.toProducer(materializer)
|
||||
|
||||
}
|
||||
|
|
@ -3,13 +3,15 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.stream.impl.DuctImpl
|
||||
import org.reactivestreams.api.Consumer
|
||||
import akka.stream.FlowMaterializer
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import org.reactivestreams.api.Producer
|
||||
import scala.collection.immutable
|
||||
import scala.util.Try
|
||||
import org.reactivestreams.api.Consumer
|
||||
import org.reactivestreams.api.Producer
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.RecoveryTransformer
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.impl.DuctImpl
|
||||
|
||||
object Duct {
|
||||
|
||||
|
|
|
|||
|
|
@ -7,14 +7,18 @@ import scala.annotation.unchecked.uncheckedVariance
|
|||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Try
|
||||
import scala.util.control.NoStackTrace
|
||||
import org.reactivestreams.api.Consumer
|
||||
import org.reactivestreams.api.Producer
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.RecoveryTransformer
|
||||
import akka.stream.Transformer
|
||||
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode }
|
||||
import akka.stream.impl.FlowImpl
|
||||
import akka.stream.impl.Ast.FutureProducerNode
|
||||
import akka.stream.impl.FlowImpl
|
||||
|
||||
/**
|
||||
* Scala API
|
||||
*/
|
||||
object Flow {
|
||||
/**
|
||||
* Construct a transformation of the given producer. The transformation steps
|
||||
|
|
@ -60,7 +64,7 @@ object Flow {
|
|||
}
|
||||
|
||||
/**
|
||||
* The Flow DSL allows the formulation of stream transformations based on some
|
||||
* Scala API: The Flow DSL allows the formulation of stream transformations based on some
|
||||
* input. The starting point can be a collection, an iterator, a block of code
|
||||
* which is evaluated repeatedly or a [[org.reactivestreams.api.Producer]].
|
||||
*
|
||||
|
|
@ -68,8 +72,8 @@ object Flow {
|
|||
*
|
||||
* Each DSL element produces a new Flow that can be further transformed, building
|
||||
* up a description of the complete transformation pipeline. In order to execute
|
||||
* this pipeline the Flow must be materialized by calling the [[#toFuture]], [[#consume]]
|
||||
* or [[#toProducer]] methods on it.
|
||||
* this pipeline the Flow must be materialized by calling the [[#toFuture]], [[#consume]],
|
||||
* [[#onComplete]], or [[#toProducer]] methods on it.
|
||||
*
|
||||
* It should be noted that the streams modeled by this library are “hot”,
|
||||
* meaning that they asynchronously flow through a series of processors without
|
||||
|
|
@ -150,22 +154,22 @@ trait Flow[+T] {
|
|||
def mapConcat[U](f: T ⇒ immutable.Seq[U]): Flow[U]
|
||||
|
||||
/**
|
||||
* Generic transformation of a stream: for each element the [[Transformer#onNext]]
|
||||
* Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]]
|
||||
* function is invoked, expecting a (possibly empty) sequence of output elements
|
||||
* to be produced.
|
||||
* After handing off the elements produced from one input element to the downstream
|
||||
* consumers, the [[Transformer#isComplete]] predicate determines whether to end
|
||||
* consumers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
|
||||
* stream processing at this point; in that case the upstream subscription is
|
||||
* canceled. Before signaling normal completion to the downstream consumers,
|
||||
* the [[Transformer#onComplete]] function is invoked to produce a (possibly empty)
|
||||
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
|
||||
* sequence of elements in response to the end-of-stream event.
|
||||
*
|
||||
* [[Transformer#onError]] is called when failure is signaled from upstream.
|
||||
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
|
||||
*
|
||||
* After normal completion or error the [[Transformer#cleanup]] function is called.
|
||||
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
|
||||
*
|
||||
* It is possible to keep state in the concrete [[Transformer]] instance with
|
||||
* ordinary instance variables. The [[Transformer]] is executed by an actor and
|
||||
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
|
||||
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
|
||||
* therefore you don not have to add any additional thread safety or memory
|
||||
* visibility constructs to access the state from the callback methods.
|
||||
*/
|
||||
|
|
@ -174,10 +178,10 @@ trait Flow[+T] {
|
|||
/**
|
||||
* This transformation stage works exactly like [[#transform]] with the
|
||||
* change that failure signaled from upstream will invoke
|
||||
* [[RecoveryTransformer#onErrorRecover]], which can emit an additional sequence of
|
||||
* [[akka.stream.RecoveryTransformer#onErrorRecover]], which can emit an additional sequence of
|
||||
* elements before the stream ends.
|
||||
*
|
||||
* [[Transformer#onError]] is not called when failure is signaled from upstream.
|
||||
* [[akka.stream.Transformer#onError]] is not called when failure is signaled from upstream.
|
||||
*/
|
||||
def transformRecover[U](recoveryTransformer: RecoveryTransformer[T, U]): Flow[U]
|
||||
|
||||
|
|
@ -293,70 +297,3 @@ trait Flow[+T] {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* General interface for stream transformation.
|
||||
*
|
||||
* It is possible to keep state in the concrete [[Transformer]] instance with
|
||||
* ordinary instance variables. The [[Transformer]] is executed by an actor and
|
||||
* therefore you don not have to add any additional thread safety or memory
|
||||
* visibility constructs to access the state from the callback methods.
|
||||
*
|
||||
* @see [[Flow#transform]]
|
||||
*/
|
||||
trait Transformer[-T, +U] {
|
||||
/**
|
||||
* Invoked for each element to produce a (possibly empty) sequence of
|
||||
* output elements.
|
||||
*/
|
||||
def onNext(element: T): immutable.Seq[U]
|
||||
|
||||
/**
|
||||
* Invoked after handing off the elements produced from one input element to the
|
||||
* downstream consumers to determine whether to end stream processing at this point;
|
||||
* in that case the upstream subscription is canceled.
|
||||
*/
|
||||
def isComplete: Boolean = false
|
||||
|
||||
/**
|
||||
* Invoked before signaling normal completion to the downstream consumers
|
||||
* to produce a (possibly empty) sequence of elements in response to the
|
||||
* end-of-stream event.
|
||||
*/
|
||||
def onComplete(): immutable.Seq[U] = Nil
|
||||
|
||||
/**
|
||||
* Invoked when failure is signaled from upstream.
|
||||
*/
|
||||
def onError(cause: Throwable): Unit = ()
|
||||
|
||||
/**
|
||||
* Invoked after normal completion or error.
|
||||
*/
|
||||
def cleanup(): Unit = ()
|
||||
|
||||
/**
|
||||
* Name of this transformation step. Used as part of the actor name.
|
||||
* Facilitates debugging and logging.
|
||||
*/
|
||||
def name: String = "transform"
|
||||
}
|
||||
|
||||
/**
|
||||
* General interface for stream transformation.
|
||||
* @see [[Flow#transformRecover]]
|
||||
* @see [[Transformer]]
|
||||
*/
|
||||
trait RecoveryTransformer[-T, +U] extends Transformer[T, U] {
|
||||
/**
|
||||
* Invoked when failure is signaled from upstream to emit an additional
|
||||
* sequence of elements before the stream ends.
|
||||
*/
|
||||
def onErrorRecover(cause: Throwable): immutable.Seq[U]
|
||||
|
||||
/**
|
||||
* Name of this transformation step. Used as part of the actor name.
|
||||
* Facilitates debugging and logging.
|
||||
*/
|
||||
override def name: String = "transformRecover"
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.javadsl;
|
||||
|
||||
import org.junit.rules.ExternalResource;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.testkit.JavaTestKit;
|
||||
|
||||
import com.typesafe.config.Config;
|
||||
|
||||
// FIXME remove this copy, and use akka.testkit.AkkaJUnitActorSystemResource when
|
||||
// akka-stream-experimental becomes a normal build project
|
||||
|
||||
/**
|
||||
* This is a resource for creating an actor system before test start and shut it
|
||||
* down afterwards.
|
||||
*
|
||||
* To use it on a class level add this to your test class:
|
||||
*
|
||||
* @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||
* new AkkaJUnitActorSystemResource(name, config);
|
||||
*
|
||||
* private final ActorSystem system =
|
||||
* actorSystemResource.getSystem();
|
||||
*
|
||||
*
|
||||
* To use it on a per test level add this to your test class:
|
||||
*
|
||||
* @Rule public AkkaJUnitActorSystemResource actorSystemResource = new
|
||||
* AkkaJUnitActorSystemResource(name, config);
|
||||
*
|
||||
* private final ActorSystem system = actorSystemResource.getSystem();
|
||||
*/
|
||||
|
||||
public class AkkaJUnitActorSystemResource extends ExternalResource {
|
||||
private ActorSystem system = null;
|
||||
private final String name;
|
||||
private final Config config;
|
||||
|
||||
private ActorSystem createSystem(String name, Config config) {
|
||||
try {
|
||||
if (config == null)
|
||||
return ActorSystem.create(name);
|
||||
else
|
||||
return ActorSystem.create(name, config);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public AkkaJUnitActorSystemResource(String name, Config config) {
|
||||
this.name = name;
|
||||
this.config = config;
|
||||
system = createSystem(name, config);
|
||||
}
|
||||
|
||||
public AkkaJUnitActorSystemResource(String name) {
|
||||
this(name, AkkaSpec.testConf());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void before() throws Throwable {
|
||||
// Sometimes the ExternalResource seems to be reused, and
|
||||
// we don't run the constructor again, so if that's the case
|
||||
// then create the system here
|
||||
if (system == null) {
|
||||
system = createSystem(name, config);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void after() {
|
||||
JavaTestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
}
|
||||
|
||||
public ActorSystem getSystem() {
|
||||
return system;
|
||||
}
|
||||
}
|
||||
370
akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java
Normal file
370
akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java
Normal file
|
|
@ -0,0 +1,370 @@
|
|||
package akka.stream.javadsl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.reactivestreams.api.Producer;
|
||||
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Function;
|
||||
import akka.japi.Function2;
|
||||
import akka.japi.Procedure;
|
||||
import akka.japi.Util;
|
||||
import akka.stream.FlowMaterializer;
|
||||
import akka.stream.MaterializerSettings;
|
||||
import akka.stream.RecoveryTransformer;
|
||||
import akka.stream.Transformer;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.testkit.JavaTestKit;
|
||||
|
||||
public class FlowTest {
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("StashJavaAPI",
|
||||
AkkaSpec.testConf());
|
||||
|
||||
final ActorSystem system = actorSystemResource.getSystem();
|
||||
|
||||
final MaterializerSettings settings = MaterializerSettings.create();
|
||||
final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseSimpleOperators() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final String[] lookup = { "a", "b", "c", "d", "e", "f" };
|
||||
final java.util.Iterator<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5).iterator();
|
||||
Flow.create(input).drop(2).take(3).map(new Function<Integer, String>() {
|
||||
public String apply(Integer elem) {
|
||||
return lookup[elem];
|
||||
}
|
||||
}).filter(new Predicate<String>() {
|
||||
public boolean test(String elem) {
|
||||
return !elem.equals("c");
|
||||
}
|
||||
}).grouped(2).mapConcat(new Function<java.util.List<String>, java.util.List<String>>() {
|
||||
public java.util.List<String> apply(java.util.List<String> elem) {
|
||||
return elem;
|
||||
}
|
||||
}).fold("", new Function2<String, String, String>() {
|
||||
public String apply(String acc, String elem) {
|
||||
return acc + elem;
|
||||
}
|
||||
}).foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
probe.expectMsgEquals("de");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseTransform() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final JavaTestKit probe2 = new JavaTestKit(system);
|
||||
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7);
|
||||
// duplicate each element, stop after 4 elements, and emit sum to the end
|
||||
Flow.create(input).transform(new Transformer<Integer, Integer>() {
|
||||
int sum = 0;
|
||||
int count = 0;
|
||||
|
||||
@Override
|
||||
public scala.collection.immutable.Seq<Integer> onNext(Integer element) {
|
||||
sum += element;
|
||||
count += 1;
|
||||
return Util.immutableSeq(new Integer[] { element, element });
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isComplete() {
|
||||
return count == 4;
|
||||
}
|
||||
|
||||
@Override
|
||||
public scala.collection.immutable.Seq<Integer> onComplete() {
|
||||
return Util.immutableSingletonSeq(sum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
probe2.getRef().tell("cleanup", ActorRef.noSender());
|
||||
}
|
||||
}).foreach(new Procedure<Integer>() {
|
||||
public void apply(Integer elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
probe.expectMsgEquals(0);
|
||||
probe.expectMsgEquals(0);
|
||||
probe.expectMsgEquals(1);
|
||||
probe.expectMsgEquals(1);
|
||||
probe.expectMsgEquals(2);
|
||||
probe.expectMsgEquals(2);
|
||||
probe.expectMsgEquals(3);
|
||||
probe.expectMsgEquals(3);
|
||||
probe.expectMsgEquals(6);
|
||||
probe2.expectMsgEquals("cleanup");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseTransformRecover() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5);
|
||||
Flow.create(input).map(new Function<Integer, Integer>() {
|
||||
public Integer apply(Integer elem) {
|
||||
if (elem == 4)
|
||||
throw new IllegalArgumentException("4 not allowed");
|
||||
else
|
||||
return elem + elem;
|
||||
}
|
||||
}).transformRecover(new RecoveryTransformer<Integer, String>() {
|
||||
|
||||
@Override
|
||||
public scala.collection.immutable.Seq<String> onNext(Integer element) {
|
||||
return Util.immutableSingletonSeq(element.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public scala.collection.immutable.Seq<String> onErrorRecover(Throwable e) {
|
||||
return Util.immutableSingletonSeq(e.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isComplete() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public scala.collection.immutable.Seq<String> onComplete() {
|
||||
return Util.immutableSeq(new String[0]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
}
|
||||
|
||||
}).foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
probe.expectMsgEquals("0");
|
||||
probe.expectMsgEquals("2");
|
||||
probe.expectMsgEquals("4");
|
||||
probe.expectMsgEquals("6");
|
||||
probe.expectMsgEquals("4 not allowed");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseGroupBy() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee");
|
||||
Flow.create(input).groupBy(new Function<String, String>() {
|
||||
public String apply(String elem) {
|
||||
return elem.substring(0, 1);
|
||||
}
|
||||
}).foreach(new Procedure<Pair<String, Producer<String>>>() {
|
||||
public void apply(final Pair<String, Producer<String>> pair) {
|
||||
Flow.create(pair.b()).foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
probe.getRef().tell(new Pair<String, String>(pair.a(), elem), ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
Map<String, List<String>> grouped = new HashMap<String, List<String>>();
|
||||
for (Object o : probe.receiveN(5)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Pair<String, String> p = (Pair<String, String>) o;
|
||||
List<String> g = grouped.get(p.a());
|
||||
if (g == null)
|
||||
g = new ArrayList<String>();
|
||||
g.add(p.b());
|
||||
grouped.put(p.a(), g);
|
||||
}
|
||||
|
||||
assertEquals(Arrays.asList("Aaa", "Abb"), grouped.get("A"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseSplitWhen() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input = Arrays.asList("A", "B", "C", "\n", "D", "\n", "E", "F");
|
||||
Flow.create(input).splitWhen(new Predicate<String>() {
|
||||
public boolean test(String elem) {
|
||||
return elem.equals("\n");
|
||||
}
|
||||
}).foreach(new Procedure<Producer<String>>() {
|
||||
public void apply(Producer<String> subProducer) {
|
||||
Flow.create(subProducer).filter(new Predicate<String>() {
|
||||
public boolean test(String elem) {
|
||||
return !elem.equals("\n");
|
||||
}
|
||||
}).grouped(10).foreach(new Procedure<List<String>>() {
|
||||
public void apply(List<String> chunk) {
|
||||
probe.getRef().tell(chunk, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
for (Object o : probe.receiveN(3)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> chunk = (List<String>) o;
|
||||
if (chunk.get(0).equals("A"))
|
||||
assertEquals(Arrays.asList("A", "B", "C"), chunk);
|
||||
else if (chunk.get(0).equals("D"))
|
||||
assertEquals(Arrays.asList("D"), chunk);
|
||||
else if (chunk.get(0).equals("E"))
|
||||
assertEquals(Arrays.asList("E", "F"), chunk);
|
||||
else
|
||||
assertEquals("[A, B, C] or [D] or [E, F]", chunk);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseMerge() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input1 = Arrays.asList("A", "B", "C");
|
||||
final java.lang.Iterable<String> input2 = Arrays.asList("D", "E", "F");
|
||||
Flow.create(input1).merge(Flow.create(input2).toProducer(materializer)).foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
Set<Object> output = new HashSet<Object>(Arrays.asList(probe.receiveN(6)));
|
||||
assertEquals(new HashSet<Object>(Arrays.asList("A", "B", "C", "D", "E", "F")), output);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseZip() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input1 = Arrays.asList("A", "B", "C");
|
||||
final java.lang.Iterable<Integer> input2 = Arrays.asList(1, 2, 3);
|
||||
Flow.create(input1).zip(Flow.create(input2).toProducer(materializer))
|
||||
.foreach(new Procedure<Pair<String, Integer>>() {
|
||||
public void apply(Pair<String, Integer> elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
List<Object> output = Arrays.asList(probe.receiveN(3));
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Pair<String, Integer>> expected = Arrays.asList(new Pair<String, Integer>("A", 1), new Pair<String, Integer>(
|
||||
"B", 2), new Pair<String, Integer>("C", 3));
|
||||
assertEquals(expected, output);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseConcat() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input1 = Arrays.asList("A", "B", "C");
|
||||
final java.lang.Iterable<String> input2 = Arrays.asList("D", "E", "F");
|
||||
Flow.create(input1).concat(Flow.create(input2).toProducer(materializer)).foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
List<Object> output = Arrays.asList(probe.receiveN(6));
|
||||
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseCallableInput() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final Callable<Integer> input = new Callable<Integer>() {
|
||||
int countdown = 5;
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
if (countdown == 0)
|
||||
throw akka.stream.Stop.getInstance();
|
||||
else {
|
||||
countdown -= 1;
|
||||
return countdown;
|
||||
}
|
||||
}
|
||||
};
|
||||
Flow.create(input).foreach(new Procedure<Integer>() {
|
||||
public void apply(Integer elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
|
||||
List<Object> output = Arrays.asList(probe.receiveN(5));
|
||||
assertEquals(Arrays.asList(4, 3, 2, 1, 0), output);
|
||||
probe.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseOnCompleteSuccess() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input = Arrays.asList("A", "B", "C");
|
||||
Flow.create(input).onComplete(materializer, new OnCompleteCallback() {
|
||||
@Override
|
||||
public void onComplete(Throwable e) {
|
||||
if (e == null)
|
||||
probe.getRef().tell("done", ActorRef.noSender());
|
||||
else
|
||||
probe.getRef().tell(e, ActorRef.noSender());
|
||||
}
|
||||
});
|
||||
|
||||
probe.expectMsgEquals("done");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseOnCompleteError() {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input = Arrays.asList("A", "B", "C");
|
||||
Flow.create(input).map(new Function<String, String>() {
|
||||
public String apply(String arg0) throws Exception {
|
||||
throw new RuntimeException("simulated err");
|
||||
}
|
||||
}).onComplete(materializer, new OnCompleteCallback() {
|
||||
@Override
|
||||
public void onComplete(Throwable e) {
|
||||
if (e == null)
|
||||
probe.getRef().tell("done", ActorRef.noSender());
|
||||
else
|
||||
probe.getRef().tell(e, ActorRef.noSender());
|
||||
}
|
||||
});
|
||||
|
||||
probe.expectMsgEquals("simulated err");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseToFuture() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input = Arrays.asList("A", "B", "C");
|
||||
Future<String> future = Flow.create(input).toFuture(materializer);
|
||||
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
assertEquals("A", result);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -11,7 +11,6 @@ import akka.testkit.EventFilter
|
|||
import scala.util.Failure
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.RecoveryTransformer
|
||||
import scala.util.Try
|
||||
import scala.util.Success
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import akka.testkit.EventFilter
|
|||
import com.typesafe.config.ConfigFactory
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.testkit.TestProbe
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import akka.stream.impl.Ast
|
|||
import akka.testkit.{ TestEvent, EventFilter }
|
||||
import akka.stream.impl.ActorBasedFlowMaterializer
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import akka.actor.ActorRef
|
|||
import scala.collection.immutable.TreeSet
|
||||
import scala.util.control.NonFatal
|
||||
import akka.stream.impl.ActorBasedFlowMaterializer
|
||||
import akka.stream.scaladsl.Transformer
|
||||
import akka.stream.impl.FlowNameCounter
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ class TcpFlowSpec extends AkkaSpec {
|
|||
|
||||
def connect(server: Server): (Processor[ByteString, ByteString], ServerConnection) = {
|
||||
val tcpProbe = TestProbe()
|
||||
tcpProbe.send(IO(StreamTcp), StreamTcp.Connect(server.address, settings = settings))
|
||||
tcpProbe.send(IO(StreamTcp), StreamTcp.Connect(settings, server.address))
|
||||
val client = server.waitAccept()
|
||||
val outgoingConnection = tcpProbe.expectMsgType[StreamTcp.OutgoingTcpConnection]
|
||||
|
||||
|
|
@ -188,13 +188,13 @@ class TcpFlowSpec extends AkkaSpec {
|
|||
|
||||
def connect(serverAddress: InetSocketAddress): StreamTcp.OutgoingTcpConnection = {
|
||||
val connectProbe = TestProbe()
|
||||
connectProbe.send(IO(StreamTcp), StreamTcp.Connect(serverAddress, settings = settings))
|
||||
connectProbe.send(IO(StreamTcp), StreamTcp.Connect(settings, serverAddress))
|
||||
connectProbe.expectMsgType[StreamTcp.OutgoingTcpConnection]
|
||||
}
|
||||
|
||||
def bind(serverAddress: InetSocketAddress = temporaryServerAddress): StreamTcp.TcpServerBinding = {
|
||||
val bindProbe = TestProbe()
|
||||
bindProbe.send(IO(StreamTcp), StreamTcp.Bind(serverAddress, settings = settings))
|
||||
bindProbe.send(IO(StreamTcp), StreamTcp.Bind(settings, serverAddress))
|
||||
bindProbe.expectMsgType[StreamTcp.TcpServerBinding]
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue