alsoTo eager cancellation #24291

This commit is contained in:
jeremystone 2018-02-19 07:31:13 +00:00 committed by Johan Andrén
parent 28ae8d4f0e
commit e766207a87
11 changed files with 59 additions and 26 deletions

View file

@ -716,6 +716,8 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug
**completes** when upstream completes
**cancels** when downstream or `Sink` cancels
---------------------------------------------------------------
### map

View file

@ -126,10 +126,26 @@ Java
: @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch }
Sinks and flows can also be supervised, using @scala[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]
@java[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]. The `RestartSink` is restarted when
@java[`akka.stream.javadsl.RestartSink` and `akka.stream.javadsl.RestartFlow`]. The `RestartSink` is restarted when
it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out
port sends an error.
@@@ note
Care should be taken when using `GraphStage`s that conditionally propagate termination signals inside a
`RestartSource`, `RestartSink` or `RestartFlow`.
An example is a `Broadcast` stage with the default `eagerCancel = false` where
some of the outlets are for side-effecting branches (that do not re-join e.g. via a `Merge`).
A failure on a side branch will not terminate the supervised stream which will
not be restarted. Conversely, a failure on the main branch can trigger a restart but leave behind old
running instances of side branches.
In this example `eagerCancel` should probably be set to `true`, or, when only a single side branch is used, `alsoTo`
or `divertTo` should be considered as alternatives.
@@@
## Supervision Strategies
@@@ note

View file

@ -72,7 +72,6 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
val a = system.actorOf(Props(new ActorWithMaterializer(p)).withDispatcher("akka.test.stream-dispatcher"))
p.expectMsg("hello")
p.expectMsg("one")
a ! PoisonPill
val Failure(ex) = p.expectMsgType[Try[Done]]
}
@ -101,7 +100,9 @@ object ActorMaterializerSpec {
implicit val mat = ActorMaterializer(settings)(context)
Source.repeat("hello")
.alsoTo(Flow[String].take(1).to(Sink.actorRef(p.ref, "one")))
.take(1)
.concat(Source.maybe)
.map(p.ref ! _)
.runWith(Sink.onComplete(signal {
p.ref ! signal
}))

View file

@ -12,6 +12,8 @@ import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import akka.testkit.TestProbe
import scala.concurrent.Promise
object ActorRefBackpressureSinkSpec {
val initMessage = "start"
val completeMessage = "done"
@ -123,11 +125,12 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
val fw = createActor(classOf[Fw2])
val sink = Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)
.withAttributes(inputBuffer(bufferSize, bufferSize))
val probe = Source(1 to streamElementCount)
.alsoToMat(Flow[Int].take(bufferSize).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right)
val bufferFullProbe = Promise[akka.Done.type]
Source(1 to streamElementCount)
.alsoTo(Flow[Int].drop(bufferSize - 1).to(Sink.foreach(_ bufferFullProbe.trySuccess(akka.Done))))
.to(sink)
.run()
probe.futureValue should ===(akka.Done)
bufferFullProbe.future.futureValue should ===(akka.Done)
expectMsg(initMessage)
fw ! TriggerAckMessage
for (i 1 to streamElementCount) {

View file

@ -1,7 +1,8 @@
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink
import scala.concurrent.{ Future, Await }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import akka.stream._
import akka.stream.testkit._
@ -240,13 +241,22 @@ class GraphBroadcastSpec extends StreamSpec {
ps2.expectComplete()
}
"alsoTo must continue if sink cancels" in assertAllStagesStopped {
val p, p2 = TestSink.probe[Int](system)
val (ps1, ps2) = Source(1 to 6).alsoToMat(p)(Keep.right).toMat(p2)(Keep.both).run()
ps2.request(6)
ps1.cancel()
ps2.expectNext(1, 2, 3, 4, 5, 6)
ps2.expectComplete()
"cancel if alsoTo side branch cancels" in assertAllStagesStopped {
val in = TestSource.probe[Int](system)
val outSide = TestSink.probe[Int](system)
val (pIn, pSide) = in.alsoToMat(outSide)(Keep.both).toMat(Sink.ignore)(Keep.left).run()
pSide.cancel()
pIn.expectCancellation()
}
"cancel if alsoTo main branch cancels" in assertAllStagesStopped {
val in = TestSource.probe[Int](system)
val outMain = TestSink.probe[Int](system)
val (pIn, pMain) = in.alsoToMat(Sink.ignore)(Keep.left).toMat(outMain)(Keep.both).run()
pMain.cancel()
pIn.expectCancellation()
}
}

View file

@ -10,7 +10,7 @@ import akka.stream.{ ActorMaterializer, StreamDetachedException }
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import scala.concurrent.Await
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
@ -126,11 +126,12 @@ class QueueSinkSpec extends StreamSpec {
val streamElementCount = bufferSize + 4
val sink = Sink.queue[Int]()
.withAttributes(inputBuffer(bufferSize, bufferSize))
val (probe, queue) = Source(1 to streamElementCount)
.alsoToMat(Flow[Int].take(bufferSize).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right)
.toMat(sink)(Keep.both)
val bufferFullProbe = Promise[akka.Done.type]
val queue = Source(1 to streamElementCount)
.alsoTo(Flow[Int].drop(bufferSize - 1).to(Sink.foreach(_ bufferFullProbe.trySuccess(akka.Done))))
.toMat(sink)(Keep.right)
.run()
probe.futureValue should ===(akka.Done)
bufferFullProbe.future.futureValue should ===(akka.Done)
for (i 1 to streamElementCount) {
queue.pull() pipeTo testActor
expectMsg(Some(i))

View file

@ -1676,7 +1676,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.alsoTo(that))

View file

@ -732,7 +732,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] =
new Source(delegate.alsoTo(that))

View file

@ -1145,7 +1145,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.alsoTo(that))

View file

@ -1137,7 +1137,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.alsoTo(that))

View file

@ -2286,14 +2286,14 @@ trait FlowOps[+Out, +Mat] {
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream and Sink cancel
* '''Cancels when''' downstream or Sink cancels
*/
def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out] = via(alsoToGraph(that))
protected def alsoToGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
GraphDSL.create(that) { implicit b r
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Out](2))
val bcast = b.add(Broadcast[Out](2, eagerCancel = true))
bcast.out(1) ~> r
FlowShape(bcast.in, bcast.out(0))
}