diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 5e20b5c389..daeb0b39bf 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -136,6 +136,11 @@ class FlowDocSpec extends AkkaSpec { val sink: Sink[Int, Unit] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_))) Source(1 to 6).to(sink) + // Broadcast to a sink inline + val otherSink: Sink[Int, Unit] = + Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore) + Source(1 to 6).to(otherSink) + //#flow-connecting } diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpMessageSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpMessageSpec.scala index 17779f42f1..58b27e2a9d 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpMessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpMessageSpec.scala @@ -6,10 +6,10 @@ import org.scalatest.{ Matchers, WordSpec } class HttpMessageSpec extends WordSpec with Matchers { def test(uri: String, hostHeader: Host, effectiveUri: String) = - HttpRequest.effectiveUri(Uri(uri), List(hostHeader), securedConnection = false, null) shouldEqual Uri(effectiveUri) + HttpRequest.effectiveUri(Uri(uri), List(hostHeader), securedConnection = false, null) shouldEqual Uri(effectiveUri) def fail(uri: String, hostHeader: Host) = - an [IllegalUriException] should be thrownBy + an[IllegalUriException] should be thrownBy HttpRequest.effectiveUri(Uri(uri), List(hostHeader), securedConnection = false, null) "HttpRequset" should { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala index 606175644c..6c8d642ccf 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala @@ -1,5 +1,7 @@ package akka.stream.scaladsl +import akka.stream.testkit.scaladsl.TestSink + import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ @@ -214,6 +216,25 @@ class GraphBroadcastSpec extends AkkaSpec { upsub.expectCancellation() } + "alsoTo must broadcast" in assertAllStagesStopped { + val p, p2 = TestSink.probe[Int](system) + val (ps1, ps2) = Source(1 to 6).alsoToMat(p)(Keep.right).toMat(p2)(Keep.both).run() + ps1.request(6) + ps2.request(6) + ps1.expectNext(1, 2, 3, 4, 5, 6) + ps2.expectNext(1, 2, 3, 4, 5, 6) + ps1.expectComplete() + ps2.expectComplete() + } + + "alsoTo must continue if sink cancels" in assertAllStagesStopped { + val p, p2 = TestSink.probe[Int](system) + val (ps1, ps2) = Source(1 to 6).alsoToMat(p)(Keep.right).toMat(p2)(Keep.both).run() + ps2.request(6) + ps1.cancel() + ps2.expectNext(1, 2, 3, 4, 5, 6) + ps2.expectComplete() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 45b6c42709..b428ceedbb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -814,6 +814,31 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.concatMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * through will also be sent to the [[Sink]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.alsoTo(that)) + + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + */ + def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2], + matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = + new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) + /** * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * picking randomly when several elements ready. @@ -860,14 +885,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph */ def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = { - //we need this only to have Flow of javadsl.Pair - def block(builder: FlowGraph.Builder[M], - source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = { - val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T]) - builder.from(source).to(zip.in1) - new Pair(zip.in0, zip.out) + val f = new function.Function2[FlowGraph.Builder[M], SourceShape[T], Inlet[Out]Pair Outlet[Out Pair T]] { + override def apply(b: FlowGraph.Builder[M], s: SourceShape[T]): Inlet[Out] Pair Outlet[Out Pair T] = { + val zip = b.graph(Zip.create[Out, T]) + b.from(s).to(zip.in1) + new Pair(zip.in0, zip.out) + } } - this.viaMat(Flow.factory.create(that, combinerToJava(block)), matF) + this.viaMat(Flow.factory.create(that, f), matF) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 0e17fc9e72..c9107b6443 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -340,6 +340,31 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.concatMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * through will also be sent to the [[Sink]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] = + new Source(delegate.alsoTo(that)) + + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + */ + def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2], + matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = + new Source(delegate.alsoToMat(that)(combinerToScala(matF))) + /** * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * picking randomly when several elements ready. @@ -385,16 +410,8 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * @see [[#zip]]. */ def zipMat[T, M, M2](that: Graph[SourceShape[T], M], - matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = { - //we need this only to have Flow of javadsl.Pair - def block(builder: FlowGraph.Builder[M], - source: SourceShape[T]): Pair[Inlet[Out], Outlet[Pair[Out, T]]] = { - val zip: FanInShape2[Out, T, Out Pair T] = builder.graph(Zip.create[Out, T]) - builder.from(source).to(zip.in1) - new Pair(zip.in0, zip.out) - } - this.viaMat(Flow.factory.create(that, combinerToJava(block)), matF) - } + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out @uncheckedVariance Pair T, M2] = + this.viaMat(Flow.empty[Out].zipMat(that, Keep.right[Unit, M]), matF) /** * Put together the elements of current [[Source]] and the given one diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala index 3b53a11122..6d38905c95 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/package.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/package.scala @@ -20,7 +20,4 @@ package object javadsl { case other ⇒ other.apply _ } - def combinerToJava[M1, M2, M](f: (M1, M2) ⇒ M): akka.japi.function.Function2[M1, M2, M] = - new akka.japi.function.Function2[M1, M2, M] { def apply(m1: M1, m2: M2): M = f.apply(m1, m2) } - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8ac2d31770..e9f4ccd3e3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1093,6 +1093,35 @@ trait FlowOps[+Out, +Mat] { */ def ++[U >: Out, M](that: Graph[SourceShape[U], M]): Repr[U, Mat] = concat(that) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * through will also be sent to the [[Sink]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Mat] = alsoToMat(that)(Keep.left) + + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + */ + def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): Repr[Out, Mat3] = + this.viaMat(Flow(that) { implicit b ⇒ + r ⇒ + import FlowGraph.Implicits._ + val bcast = b.add(Broadcast[Out](2)) + bcast.out(1) ~> r + (bcast.in, bcast.out(0)) + })(matF) + def withAttributes(attr: Attributes): Repr[Out, Mat] /** INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 3613b68b09..69f8442c32 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -219,5 +219,4 @@ object Sink extends SinkApply { require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout)) } - }