Merge pull request #18647 from akka/wip-17827-alsoTo-√

=str - #17827 - Introduces  operator for Sources and Flows, to attach…
This commit is contained in:
Viktor Klang (√) 2015-10-09 21:06:57 +02:00
commit 92e2ac21f0
7 changed files with 114 additions and 21 deletions

View file

@ -136,6 +136,11 @@ class FlowDocSpec extends AkkaSpec {
val sink: Sink[Int, Unit] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_)))
Source(1 to 6).to(sink)
// Broadcast to a sink inline
val otherSink: Sink[Int, Unit] =
Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
Source(1 to 6).to(otherSink)
//#flow-connecting
}

View file

@ -1,5 +1,7 @@
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
@ -214,6 +216,25 @@ class GraphBroadcastSpec extends AkkaSpec {
upsub.expectCancellation()
}
"alsoTo must broadcast" in assertAllStagesStopped {
val p, p2 = TestSink.probe[Int](system)
val (ps1, ps2) = Source(1 to 6).alsoToMat(p)(Keep.right).toMat(p2)(Keep.both).run()
ps1.request(6)
ps2.request(6)
ps1.expectNext(1, 2, 3, 4, 5, 6)
ps2.expectNext(1, 2, 3, 4, 5, 6)
ps1.expectComplete()
ps2.expectComplete()
}
"alsoTo must continue if sink cancels" in assertAllStagesStopped {
val p, p2 = TestSink.probe[Int](system)
val (ps1, ps2) = Source(1 to 6).alsoToMat(p)(Keep.right).toMat(p2)(Keep.both).run()
ps2.request(6)
ps1.cancel()
ps2.expectNext(1, 2, 3, 4, 5, 6)
ps2.expectComplete()
}
}
}

View file

@ -814,6 +814,31 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
*
* '''Backpressures when''' downstream or Sink backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* @see [[#alsoTo]]
*/
def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] =
new Flow(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
@ -860,14 +885,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*/
def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = {
//we need this only to have Flow of javadsl.Pair
def block(builder: FlowGraph.Builder[M],
source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = {
val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T])
builder.from(source).to(zip.in1)
new Pair(zip.in0, zip.out)
val f = new function.Function2[FlowGraph.Builder[M], SourceShape[T], Inlet[Out]Pair Outlet[Out Pair T]] {
override def apply(b: FlowGraph.Builder[M], s: SourceShape[T]): Inlet[Out] Pair Outlet[Out Pair T] = {
val zip = b.graph(Zip.create[Out, T])
b.from(s).to(zip.in1)
new Pair(zip.in0, zip.out)
}
}
this.viaMat(Flow.factory.create(that, combinerToJava(block)), matF)
this.viaMat(Flow.factory.create(that, f), matF)
}
/**

View file

@ -340,6 +340,31 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.concatMat(that)(combinerToScala(matF)))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
*
* '''Backpressures when''' downstream or Sink backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
new Source(delegate.alsoTo(that))
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* @see [[#alsoTo]]
*/
def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2],
matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] =
new Source(delegate.alsoToMat(that)(combinerToScala(matF)))
/**
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
@ -385,16 +410,8 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
* @see [[#zip]].
*/
def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = {
//we need this only to have Flow of javadsl.Pair
def block(builder: FlowGraph.Builder[M],
source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = {
val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T])
builder.from(source).to(zip.in1)
new Pair(zip.in0, zip.out)
}
this.viaMat(Flow.factory.create(that, combinerToJava(block)), matF)
}
matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] =
this.viaMat(Flow.empty[Out].zipMat(that, Keep.right[Unit, M]), matF)
/**
* Put together the elements of current [[Source]] and the given one

View file

@ -20,7 +20,4 @@ package object javadsl {
case other other.apply _
}
def combinerToJava[M1, M2, M](f: (M1, M2) M): akka.japi.function.Function2[M1, M2, M] =
new akka.japi.function.Function2[M1, M2, M] { def apply(m1: M1, m2: M2): M = f.apply(m1, m2) }
}

View file

@ -1093,6 +1093,35 @@ trait FlowOps[+Out, +Mat] {
*/
def ++[U >: Out, M](that: Graph[SourceShape[U], M]): Repr[U, Mat] = concat(that)
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* '''Emits when''' element is available and demand exists both from the Sink and the downstream.
*
* '''Backpressures when''' downstream or Sink backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Mat] = alsoToMat(that)(Keep.left)
/**
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* @see [[#alsoTo]]
*/
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): Repr[Out, Mat3] =
this.viaMat(Flow(that) { implicit b
r
import FlowGraph.Implicits._
val bcast = b.add(Broadcast[Out](2))
bcast.out(1) ~> r
(bcast.in, bcast.out(0))
})(matF)
def withAttributes(attr: Attributes): Repr[Out, Mat]
/** INTERNAL API */

View file

@ -219,5 +219,4 @@ object Sink extends SinkApply {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout))
}
}