2014-04-23 10:05:09 +02:00
/* *
* Copyright ( C ) 2014 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.stream.javadsl
2014-08-22 11:42:05 +02:00
import akka.stream._
2015-01-28 14:19:50 +01:00
import akka.japi. { Util , Pair }
2014-10-27 14:35:41 +01:00
import akka.stream.scaladsl
2014-10-03 17:33:14 +02:00
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
2014-11-12 10:43:39 +01:00
import akka.stream.stage.Stage
2015-01-28 14:19:50 +01:00
import akka.stream.impl.StreamLayout
2014-04-23 10:05:09 +02:00
2014-10-03 17:33:14 +02:00
object Flow {
2014-04-23 10:05:09 +02:00
2014-10-27 14:35:41 +01:00
import akka.stream.scaladsl.JavaConverters._
2014-05-22 20:58:38 +02:00
2015-01-28 14:19:50 +01:00
val factory : FlowCreate = new FlowCreate { }
2014-10-27 14:35:41 +01:00
/* * Adapt [[scaladsl.Flow]] for use within Java DSL */
2015-01-28 14:19:50 +01:00
def adapt [ I , O , M ] ( flow : scaladsl.Flow [ I , O , M ] ) : javadsl.Flow [ I , O , M ] =
2014-10-20 14:09:24 +02:00
new Flow ( flow )
2014-04-23 10:05:09 +02:00
2014-11-28 10:41:57 +01:00
/* * Create a `Flow` which can process elements of type `T`. */
2015-01-28 14:19:50 +01:00
def empty [ T ] ( ) : javadsl.Flow [ T , T , Unit ] =
2014-11-28 10:41:57 +01:00
Flow . create ( )
2014-10-20 14:09:24 +02:00
/* * Create a `Flow` which can process elements of type `T`. */
2015-01-28 14:19:50 +01:00
def create [ T ] ( ) : javadsl.Flow [ T , T , Unit ] =
adapt ( scaladsl . Flow [ T ] )
2014-10-20 14:09:24 +02:00
/* * Create a `Flow` which can process elements of type `T`. */
2015-01-28 14:19:50 +01:00
def of [ T ] ( clazz : Class [ T ] ) : javadsl.Flow [ T , T , Unit ] =
2014-10-20 14:09:24 +02:00
create [ T ] ( )
2015-03-04 15:22:33 +01:00
/* *
* A graph with the shape of a flow logically is a flow , this method makes
* it so also in type .
*/
def wrap [ I , O , M ] ( g : Graph [ FlowShape [ I , O ] , M ] ) : Flow [ I , O , M ] = new Flow ( scaladsl . Flow . wrap ( g ) )
2014-10-20 14:09:24 +02:00
}
2014-05-16 14:21:15 +02:00
2014-10-20 14:09:24 +02:00
/* * Create a `Flow` which can process elements of type `T`. */
2015-01-28 14:19:50 +01:00
class Flow [ - In , + Out , + Mat ] ( delegate : scaladsl.Flow [ In , Out , Mat ] ) extends Graph [ FlowShape [ In , Out ] , Mat ] {
2014-10-03 17:33:14 +02:00
import scala.collection.JavaConverters._
2014-10-27 14:35:41 +01:00
import akka.stream.scaladsl.JavaConverters._
2014-05-20 16:02:09 +02:00
2015-01-28 14:19:50 +01:00
override def shape : FlowShape [ In , Out ] = delegate . shape
private [ stream ] def module : StreamLayout . Module = delegate . module
2014-10-03 17:33:14 +02:00
/* * Converts this Flow to it's Scala DSL counterpart */
2015-01-28 14:19:50 +01:00
def asScala : scaladsl.Flow [ In , Out , Mat ] = delegate
/* *
* Transform only the materialized value of this Flow , leaving all other properties as they were .
*/
def mapMaterialized [ Mat2 ] ( f : japi.Function [ Mat , Mat2 ] ) : Flow [ In , Out , Mat2 ] =
new Flow ( delegate . mapMaterialized ( f . apply _ ) )
2014-05-20 16:02:09 +02:00
2014-04-23 10:05:09 +02:00
/* *
2014-10-31 10:43:42 +02:00
* Transform this [ [ Flow ] ] by appending the given processing steps .
2014-04-23 10:05:09 +02:00
*/
2015-01-28 14:19:50 +01:00
def via [ T , M ] ( flow : javadsl.Flow [ Out , T , M ] ) : javadsl.Flow [ In , T , Mat ] =
2014-10-31 10:43:42 +02:00
new Flow ( delegate . via ( flow . asScala ) )
2014-04-23 10:05:09 +02:00
2015-01-28 14:19:50 +01:00
/* *
* Transform this [ [ Flow ] ] by appending the given processing steps .
*/
def via [ T , M , M2 ] ( flow : javadsl.Flow [ Out , T , M ] , combine : japi.Function2 [ Mat , M , M2 ] ) : javadsl.Flow [ In , T , M2 ] =
new Flow ( delegate . viaMat ( flow . asScala ) ( combinerToScala ( combine ) ) )
2014-04-23 10:05:09 +02:00
/* *
2014-10-31 10:43:42 +02:00
* Connect this [ [ Flow ] ] to a [ [ Sink ] ] , concatenating the processing steps of both .
2014-04-23 10:05:09 +02:00
*/
2015-01-28 14:19:50 +01:00
def to ( sink : javadsl.Sink [ Out , _ ] ) : javadsl.Sink [ In , Mat ] =
2014-10-31 10:43:42 +02:00
new Sink ( delegate . to ( sink . asScala ) )
2014-04-23 10:05:09 +02:00
2015-01-28 14:19:50 +01:00
/* *
* Connect this [ [ Flow ] ] to a [ [ Sink ] ] , concatenating the processing steps of both .
*/
def to [ M , M2 ] ( sink : javadsl.Sink [ Out , M ] , combine : japi.Function2 [ Mat , M , M2 ] ) : javadsl.Sink [ In , M2 ] =
new Sink ( delegate . toMat ( sink . asScala ) ( combinerToScala ( combine ) ) )
2014-11-28 10:41:57 +01:00
/* *
* Join this [ [ Flow ] ] to another [ [ Flow ] ] , by cross connecting the inputs and outputs , creating a [ [ RunnableFlow ] ]
*/
2015-03-04 15:22:33 +01:00
def join [ M ] ( flow : javadsl.Flow [ Out , In , M ] ) : javadsl.RunnableFlow [ Mat ] =
new RunnableFlowAdapter ( delegate . join ( flow . asScala ) )
2015-01-28 14:19:50 +01:00
/* *
* Join this [ [ Flow ] ] to another [ [ Flow ] ] , by cross connecting the inputs and outputs , creating a [ [ RunnableFlow ] ]
*/
def join [ M , M2 ] ( flow : javadsl.Flow [ Out , In , M ] , combine : japi.Function2 [ Mat , M , M2 ] ) : javadsl.RunnableFlow [ M2 ] =
new RunnableFlowAdapter ( delegate . joinMat ( flow . asScala ) ( combinerToScala ( combine ) ) )
2014-11-28 10:41:57 +01:00
2015-03-04 15:22:33 +01:00
/* *
* Join this [ [ Flow ] ] to a [ [ BidiFlow ] ] to close off the “ top ” of the protocol stack :
* { { {
* +---------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | ~ Out ~> | | ~~> O2
* | | flow | | bidi | |
* | | | <~ In ~ | | <~~ I2
* | +------+ +------+ |
* +---------------------------+
* } } }
* The materialized value of the combined [ [ Flow ] ] will be the materialized
* value of the current flow ( ignoring the [ [ BidiFlow ] ] ’ s value ) , use
* [ [ Flow # joinMat [ I2 * joinMat ] ] if a different strategy is needed .
*/
def join [ I2 , O2 , Mat2 ] ( bidi : BidiFlow [ Out , O2 , I2 , In , Mat2 ] ) : Flow [ I2 , O2 , Mat ] =
new Flow ( delegate . join ( bidi . asScala ) )
/* *
* Join this [ [ Flow ] ] to a [ [ BidiFlow ] ] to close off the “ top ” of the protocol stack :
* { { {
* +---------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | ~ Out ~> | | ~~> O2
* | | flow | | bidi | |
* | | | <~ In ~ | | <~~ I2
* | +------+ +------+ |
* +---------------------------+
* } } }
* The `combine` function is used to compose the materialized values of this flow and that
* [ [ BidiFlow ] ] into the materialized value of the resulting [ [ Flow ] ] .
*/
def join [ I2 , O2 , Mat2 , M ] ( bidi : BidiFlow [ Out , O2 , I2 , In , Mat2 ] , combine : japi.Function2 [ Mat , Mat2 , M ] ) : Flow [ I2 , O2 , M ] =
new Flow ( delegate . joinMat ( bidi . asScala ) ( combinerToScala ( combine ) ) )
2014-10-20 14:09:24 +02:00
/* *
* Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it .
*
* The returned tuple contains the materialized values of the `KeyedSource` and `KeyedSink` ,
2015-03-05 12:21:17 +01:00
* e . g . the `Subscriber` of a `Source.subscriber` and `Publisher` of a `Sink.publisher` .
2014-10-20 14:09:24 +02:00
*
* @tparam T materialized type of given KeyedSource
* @tparam U materialized type of given KeyedSink
*/
2015-02-26 22:42:34 +01:00
def runWith [ T , U ] ( source : javadsl.Source [ In , T ] , sink : javadsl.Sink [ Out , U ] , materializer : FlowMaterializer ) : akka.japi.Pair [ T , U ] = {
2014-10-17 14:05:50 +02:00
val p = delegate . runWith ( source . asScala , sink . asScala ) ( materializer )
2014-10-20 14:09:24 +02:00
akka . japi . Pair ( p . _1 . asInstanceOf [ T ] , p . _2 . asInstanceOf [ U ] )
2014-10-03 17:33:14 +02:00
}
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step .
*/
2015-01-28 14:19:50 +01:00
def map [ T ] ( f : japi.Function [ Out , T ] ) : javadsl.Flow [ In , T , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . map ( f . apply ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Transform each input element into a sequence of output elements that is
* then flattened into the output stream .
*/
2015-01-28 14:19:50 +01:00
def mapConcat [ T ] ( f : japi.Function [ Out , java . util . List [ T ] ] ) : javadsl.Flow [ In , T , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . mapConcat ( elem ⇒ Util . immutableSeq ( f . apply ( elem ) ) ) )
2014-05-23 13:52:39 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Transform this stream by applying the given function to each of the elements
2014-12-18 10:34:59 +01:00
* as they pass through this processing step . The function returns a `Future` and the
* value of that future will be emitted downstreams . As many futures as requested elements by
2014-10-20 14:09:24 +02:00
* downstream may run in parallel and may complete in any order , but the elements that
2014-12-18 10:34:59 +01:00
* are emitted downstream are in the same order as received from upstream .
2014-10-20 14:09:24 +02:00
*
2015-02-04 09:26:32 +01:00
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [ [ akka . stream . Supervision # stop ] ]
* the stream will be completed with failure .
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [ [ akka . stream . Supervision # resume ] ] or
* [ [ akka . stream . Supervision # restart ] ] the element is dropped and the stream continues .
*
2014-10-20 14:09:24 +02:00
* @see [ [ # mapAsyncUnordered ] ]
*/
2015-01-28 14:19:50 +01:00
def mapAsync [ T ] ( f : japi.Function [ Out , Future [ T ] ] ) : javadsl.Flow [ In , T , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . mapAsync ( f . apply ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Transform this stream by applying the given function to each of the elements
2014-12-18 10:34:59 +01:00
* as they pass through this processing step . The function returns a `Future` and the
* value of that future will be emitted downstreams . As many futures as requested elements by
2014-10-20 14:09:24 +02:00
* downstream may run in parallel and each processed element will be emitted dowstream
* as soon as it is ready , i . e . it is possible that the elements are not emitted downstream
2014-12-18 10:34:59 +01:00
* in the same order as received from upstream .
2014-10-20 14:09:24 +02:00
*
2015-02-04 09:26:32 +01:00
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [ [ akka . stream . Supervision # stop ] ]
* the stream will be completed with failure .
*
* If the group by function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [ [ akka . stream . Supervision # resume ] ] or
* [ [ akka . stream . Supervision # restart ] ] the element is dropped and the stream continues .
*
2014-10-20 14:09:24 +02:00
* @see [ [ # mapAsync ] ]
*/
2015-01-28 14:19:50 +01:00
def mapAsyncUnordered [ T ] ( f : japi.Function [ Out , Future [ T ] ] ) : javadsl.Flow [ In , T , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . mapAsyncUnordered ( f . apply ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Only pass on those elements that satisfy the given predicate .
*/
2015-01-28 14:19:50 +01:00
def filter ( p : japi.Predicate [ Out ] ) : javadsl.Flow [ In , Out , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . filter ( p . test ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step .
* Non - matching elements are filtered out .
*/
2015-01-28 14:19:50 +01:00
def collect [ T ] ( pf : PartialFunction [ Out , T ] ) : javadsl.Flow [ In , T , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . collect ( pf ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Chunk up this stream into groups of the given size , with the last group
* possibly smaller than requested due to end - of - stream .
*
* `n` must be positive , otherwise IllegalArgumentException is thrown .
*/
2015-01-28 14:19:50 +01:00
def grouped ( n : Int ) : javadsl.Flow [ In , java . util . List [ Out @ uncheckedVariance ] , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . grouped ( n ) . map ( _ . asJava ) ) // FIXME optimize to one step
2014-05-20 13:46:35 +02:00
2014-11-09 21:09:50 +01:00
/* *
* Similar to `fold` but is not a terminal operation ,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f` ,
* emitting the next current value .
2015-02-04 09:26:32 +01:00
*
* If the function `f` throws an exception and the supervision decision is
* [ [ akka . stream . Supervision # restart ] ] current value starts at `zero` again
* the stream will continue .
2014-11-09 21:09:50 +01:00
*/
2015-01-28 14:19:50 +01:00
def scan [ T ] ( zero : T ) ( f : japi.Function2 [ T , Out , T ] ) : javadsl.Flow [ In , T , Mat ] =
2014-11-09 21:09:50 +01:00
new Flow ( delegate . scan ( zero ) ( f . apply ) )
2014-10-20 14:09:24 +02:00
/* *
* Chunk up this stream into groups of elements received within a time window ,
* or limited by the given number of elements , whatever happens first .
* Empty groups will not be emitted if no elements are received from upstream .
* The last group before end - of - stream will contain the buffered elements
* since the previously emitted group .
*
* `n` must be positive , and `d` must be greater than 0 seconds , otherwise
* IllegalArgumentException is thrown .
*/
2015-01-28 14:19:50 +01:00
def groupedWithin ( n : Int , d : FiniteDuration ) : javadsl.Flow [ In , java . util . List [ Out @ uncheckedVariance ] , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . groupedWithin ( n , d ) . map ( _ . asJava ) ) // FIXME optimize to one step
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Discard the given number of elements at the beginning of the stream .
* No elements will be dropped if `n` is zero or negative .
*/
2015-03-03 10:57:25 +01:00
def drop ( n : Long ) : javadsl.Flow [ In , Out , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . drop ( n ) )
2014-05-20 13:46:35 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Discard the elements received within the given duration at beginning of the stream .
*/
2015-01-28 14:19:50 +01:00
def dropWithin ( d : FiniteDuration ) : javadsl.Flow [ In , Out , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . dropWithin ( d ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Terminate processing ( and cancel the upstream publisher ) after the given
* number of elements . Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step .
*
* The stream will be completed without producing any elements if `n` is zero
* or negative .
*/
2015-03-03 10:57:25 +01:00
def take ( n : Long ) : javadsl.Flow [ In , Out , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . take ( n ) )
2014-05-20 13:46:35 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Terminate processing ( and cancel the upstream publisher ) after the given
* duration . Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step .
*
* Note that this can be combined with [ [ # take ] ] to limit the number of elements
* within the duration .
*/
2015-01-28 14:19:50 +01:00
def takeWithin ( d : FiniteDuration ) : javadsl.Flow [ In , Out , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . takeWithin ( d ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them . For example a conflate step might average incoming numbers if the
* upstream publisher is faster .
*
* This element only rolls up elements if the upstream is faster , but if the downstream is faster it will not
* duplicate elements .
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
2015-01-28 14:19:50 +01:00
def conflate [ S ] ( seed : japi.Function [ Out , S ] , aggregate : japi.Function2 [ S , Out , S ] ) : javadsl.Flow [ In , S , Mat ] =
2014-11-03 20:49:55 +01:00
new Flow ( delegate . conflate ( seed . apply ) ( aggregate . apply ) )
2014-08-22 11:42:05 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older
* element until new element comes from the upstream . For example an expand step might repeat the last element for
* the subscriber until it receives an update from upstream .
*
* This element will never "drop" upstream elements as all elements go through at least one extrapolation step .
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* subscriber .
*
2015-02-04 09:26:32 +01:00
* Expand does not support [ [ akka . stream . Supervision # restart ] ] and [ [ akka . stream . Supervision # resume ] ] .
* Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure .
*
2014-10-20 14:09:24 +02:00
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state .
*/
2015-01-28 14:19:50 +01:00
def expand [ S , U ] ( seed : japi.Function [ Out , S ] , extrapolate : japi.Function [ S , akka . japi . Pair [ U , S ] ] ) : javadsl.Flow [ In , U , Mat ] =
2014-11-08 00:08:36 +01:00
new Flow ( delegate . expand ( seed ( _ ) ) ( s ⇒ {
val p = extrapolate ( s )
2014-10-20 14:09:24 +02:00
( p . first , p . second )
} ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full .
2014-11-06 14:03:01 +01:00
* Depending on the defined [ [ akka . stream . OverflowStrategy ] ] it might drop elements or backpressure the upstream if
* there is no space available
2014-10-20 14:09:24 +02:00
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
2015-01-28 14:19:50 +01:00
def buffer ( size : Int , overflowStrategy : OverflowStrategy ) : javadsl.Flow [ In , Out , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . buffer ( size , overflowStrategy ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
2014-11-12 10:43:39 +01:00
* Generic transformation of a stream with a custom processing [ [ akka . stream . stage . Stage ] ] .
* This operator makes it possible to extend the `Flow` API when there is no specialized
* operator that performs the transformation .
2014-10-20 14:09:24 +02:00
*/
2015-01-28 14:19:50 +01:00
def transform [ U ] ( mkStage : japi.Creator [ Stage [ Out , U ] ] ) : javadsl.Flow [ In , U , Mat ] =
2014-12-01 20:07:55 +02:00
new Flow ( delegate . transform ( ( ) ⇒ mkStage . create ( ) ) )
2014-04-23 10:05:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements . If ' 'n' ' is zero or negative , then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged .
*/
2015-01-28 14:19:50 +01:00
def prefixAndTail ( n : Int ) : javadsl.Flow [ In , akka . japi . Pair [ java . util . List [ Out @ uncheckedVariance ] , javadsl . Source [ Out @ uncheckedVariance , Unit ] ] , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . prefixAndTail ( n ) . map { case ( taken , tail ) ⇒ akka . japi . Pair ( taken . asJava , tail . asJava ) } )
2014-05-16 14:21:15 +02:00
2014-10-20 14:09:24 +02:00
/* *
* This operation demultiplexes the incoming stream into separate output
* streams , one for each element key . The key is computed for each element
* using the given function . When a new key is encountered for the first time
* it is emitted to the downstream subscriber together with a fresh
* flow that will eventually produce all the elements of the substream
* for that key . Not consuming the elements from the created streams will
* stop this processor from processing more elements , therefore you must take
* care to unblock ( or cancel ) all of the produced streams even if you want
* to consume only one of them .
2015-02-04 09:26:32 +01:00
*
* If the group by function `f` throws an exception and the supervision decision
* is [ [ akka . stream . Supervision # stop ] ] the stream and substreams will be completed
* with failure .
*
* If the group by function `f` throws an exception and the supervision decision
* is [ [ akka . stream . Supervision # resume ] ] or [ [ akka . stream . Supervision # restart ] ]
* the element is dropped and the stream and substreams continue .
2014-10-20 14:09:24 +02:00
*/
2015-01-28 14:19:50 +01:00
def groupBy [ K ] ( f : japi.Function [ Out , K ] ) : javadsl.Flow [ In , akka . japi . Pair [ K , javadsl . Source [ Out @ uncheckedVariance , Unit ] ] , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . groupBy ( f . apply ) . map { case ( k , p ) ⇒ akka . japi . Pair ( k , p . asJava ) } ) // FIXME optimize to one step
2014-05-20 16:02:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* This operation applies the given predicate to all incoming elements and
* emits them to a stream of output streams , always beginning a new one with
* the current element if the given predicate returns true for it . This means
* that for the following series of predicate values , three substreams will
* be produced with lengths 1 , 2 , and 3 :
*
* { { {
* false , // element goes into first substream
* true , false , // elements go into second substream
* true , false , false // elements go into third substream
* } } }
2015-02-04 09:26:32 +01:00
*
* If the split predicate `p` throws an exception and the supervision decision
* is [ [ akka . stream . Supervision # stop ] ] the stream and substreams will be completed
* with failure .
*
* If the split predicate `p` throws an exception and the supervision decision
* is [ [ akka . stream . Supervision # resume ] ] or [ [ akka . stream . Supervision # restart ] ]
* the element is dropped and the stream and substreams continue .
2014-10-20 14:09:24 +02:00
*/
2015-01-28 14:19:50 +01:00
def splitWhen ( p : japi.Predicate [ Out ] ) : javadsl.Flow [ In , Source [ Out , Unit ] , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . splitWhen ( p . test ) . map ( _ . asJava ) )
2014-05-20 16:02:09 +02:00
2014-10-20 14:09:24 +02:00
/* *
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy .
* This operation can be used on a stream of element type [ [ Source ] ] .
*/
2015-01-28 14:19:50 +01:00
def flatten [ U ] ( strategy : akka.stream.FlattenStrategy [ Out , U ] ) : javadsl.Flow [ In , U , Mat ] =
2014-10-20 14:09:24 +02:00
new Flow ( delegate . flatten ( strategy ) )
2014-05-20 16:02:09 +02:00
2014-11-03 12:59:05 +01:00
/* *
* Returns a new `Flow` that concatenates a secondary `Source` to this flow so that ,
* the first element emitted by the given ( "second" ) source is emitted after the last element of this Flow .
*/
2015-01-28 14:19:50 +01:00
def concat [ M ] ( second : javadsl.Source [ Out @ uncheckedVariance , M ] ) : javadsl.Flow [ In , Out , Mat @ uncheckedVariance Pair M ] =
new Flow ( delegate . concat ( second . asScala ) . mapMaterialized ( p ⇒ Pair ( p . _1 , p . _2 ) ) )
2014-12-01 20:07:55 +02:00
/* *
* Applies given [ [ OperationAttributes ] ] to a given section .
*/
2015-01-28 14:19:50 +01:00
def section [ O , M ] ( attributes : OperationAttributes , section : japi.Function [ javadsl . Flow [ Out , Out , Unit ] , javadsl . Flow [ Out , O , M ] ] @uncheckedVariance ) : javadsl.Flow [ In , O , M ] =
2014-12-01 20:07:55 +02:00
new Flow ( delegate . section ( attributes . asScala ) {
2015-01-28 14:19:50 +01:00
val scalaToJava = ( flow : scaladsl.Flow [ Out , Out , Unit ] ) ⇒ new javadsl . Flow ( flow )
val javaToScala = ( flow : javadsl.Flow [ Out , O , M ] ) ⇒ flow . asScala
2014-12-01 20:07:55 +02:00
scalaToJava andThen section . apply andThen javaToScala
} )
2015-03-05 12:21:17 +01:00
def withAttributes ( attr : OperationAttributes ) : javadsl.Flow [ In , Out , Mat ] =
new Flow ( delegate . withAttributes ( attr . asScala ) )
def named ( name : String ) : javadsl.Flow [ In , Out , Mat ] =
new Flow ( delegate . named ( name ) )
2014-10-03 17:33:14 +02:00
}
2014-05-15 09:35:42 +02:00
2014-10-03 17:33:14 +02:00
/* *
* Java API
*
* Flow with attached input and output , can be executed .
*/
2015-02-26 11:58:29 +01:00
trait RunnableFlow [ + Mat ] extends Graph [ ClosedShape , Mat ] {
2014-12-02 16:38:14 +01:00
/* *
2015-03-06 10:23:26 +01:00
* Run this flow and return the materialized values of the flow .
2014-12-02 16:38:14 +01:00
*/
2015-02-26 22:42:34 +01:00
def run ( materializer : FlowMaterializer ) : Mat
2014-12-02 16:38:14 +01:00
/* *
2015-01-28 14:19:50 +01:00
* Transform only the materialized value of this RunnableFlow , leaving all other properties as they were .
2014-12-02 16:38:14 +01:00
*/
2015-01-28 14:19:50 +01:00
def mapMaterialized [ Mat2 ] ( f : japi.Function [ Mat , Mat2 ] ) : RunnableFlow [ Mat2 ]
2014-10-03 17:33:14 +02:00
}
2014-08-15 15:37:09 +02:00
2014-10-03 17:33:14 +02:00
/* * INTERNAL API */
2015-01-28 14:19:50 +01:00
private [ akka ] class RunnableFlowAdapter [ Mat ] ( runnable : scaladsl.RunnableFlow [ Mat ] ) extends RunnableFlow [ Mat ] {
2015-02-26 11:58:29 +01:00
def shape = ClosedShape
def module = runnable . module
2015-01-28 14:19:50 +01:00
override def mapMaterialized [ Mat2 ] ( f : japi.Function [ Mat , Mat2 ] ) : RunnableFlow [ Mat2 ] =
new RunnableFlowAdapter ( runnable . mapMaterialized ( f . apply _ ) )
2015-02-26 22:42:34 +01:00
override def run ( materializer : FlowMaterializer ) : Mat = runnable . run ( ) ( materializer )
2014-10-03 17:33:14 +02:00
}