diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index 393f14d8e3..e42e0439c1 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -42,6 +42,7 @@ take the specified number of elements to take has not yet been takeWhile the predicate is true and until the first false result downstream backpressures predicate returned false or upstream completes dropWhile the predicate returned false and for all following stream elements predicate returned false and downstream backpressures upstream completes recover the element is available from the upstream or upstream is failed and pf returns an element downstream backpressures, not when failure happened upstream completes or upstream failed with exception pf can handle +detach the upstream stage has emitted and there is demand downstream backpressures upstream completes ===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== Asynchronous processing stages diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 42c0fffac4..4bb2dc4e41 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -585,7 +585,10 @@ object TestSubscriber { private lazy val subscription = expectSubscription() /** Asserts that a subscription has been received or will be received */ - def ensureSubscription(): Unit = subscription // initializes lazy val + def ensureSubscription(): Self = { + subscription // initializes lazy val + this + } def request(n: Long): Self = { subscription.request(n) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDetacherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDetacherSpec.scala new file mode 100644 index 0000000000..ff65b991e7 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDetacherSpec.scala @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import org.scalactic.ConversionCheckedTripleEquals +import org.scalatest.concurrent.ScalaFutures +import akka.stream.testkit.AkkaSpec +import akka.stream._ +import akka.stream.scaladsl._ +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.Utils + +class FlowDetacherSpec extends AkkaSpec with ConversionCheckedTripleEquals with ScalaFutures { + + implicit val materializer = ActorMaterializer() + implicit val patience = PatienceConfig(2.seconds) + + "A Detacher" must { + + "pass through all elements" in Utils.assertAllStagesStopped { + Source(1 to 100) + .detach + .runWith(Sink.seq) + .futureValue should ===(1 to 100) + } + + "pass through failure" in Utils.assertAllStagesStopped { + val ex = new Exception("buh") + val result = Source(1 to 100) + .map(x ⇒ if (x == 50) throw ex else x) + .detach + .runWith(Sink.seq) + intercept[Exception] { + Await.result(result, 2.seconds) + } should ===(ex) + + } + + "emit the last element when completed without demand" in Utils.assertAllStagesStopped { + Source.single(42) + .detach + .runWith(TestSink.probe) + .ensureSubscription() + .expectNoMsg(500.millis) + .requestNext() should ===(42) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index cd170aaa87..878cb822de 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -78,13 +78,17 @@ object GraphStages { tryPull(in) } } + override def onUpstreamFinish(): Unit = { + if (!isAvailable(in)) completeStage() + } }) setHandler(out, new OutHandler { override def onPull(): Unit = { if (isAvailable(in)) { push(out, grab(in)) - tryPull(in) + if (isClosed(in)) completeStage() + else pull(in) } } }) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 9930543f8c..54f9477742 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1403,6 +1403,20 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Flow[In, Out, Mat] = new Flow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** + * Detaches upstream demand from downstream demand without detaching the + * stream rates; in other words acts like a buffer of size 1. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def detach: javadsl.Flow[In, Out, Mat] = new Flow(delegate.detach) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9f5e2fa465..7baaddb18d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1571,6 +1571,20 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.Source[Out, Mat] = new Source(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode)) + /** + * Detaches upstream demand from downstream demand without detaching the + * stream rates; in other words acts like a buffer of size 1. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def detach: javadsl.Source[Out, Mat] = new Source(delegate.detach) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 45b04fdbb7..654c29e27a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1012,6 +1012,20 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.throttle(cost, per, maximumBurst, costCalculation.apply, mode)) + /** + * Detaches upstream demand from downstream demand without detaching the + * stream rates; in other words acts like a buffer of size 1. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def detach: javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.detach) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 11ec39881a..4e23a20864 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1011,6 +1011,20 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source costCalculation: function.Function[Out, Integer], mode: ThrottleMode): javadsl.SubSource[Out, Mat] = new SubSource(delegate.throttle(cost, per, maximumBurst, costCalculation.apply _, mode)) + /** + * Detaches upstream demand from downstream demand without detaching the + * stream rates; in other words acts like a buffer of size 1. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def detach: javadsl.SubSource[Out, Mat] = new SubSource(delegate.detach) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index ff6cb4fec6..d29f35d658 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1299,6 +1299,20 @@ trait FlowOps[+Out, +Mat] { via(new Throttle(cost, per, maximumBurst, costCalculation, mode)) } + /** + * Detaches upstream demand from downstream demand without detaching the + * stream rates; in other words acts like a buffer of size 1. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def detach: Repr[Out] = via(GraphStages.detacher) + /** * Delays the initial element by the specified duration. *