From 0d04d3bf5c07d2a0fd30ffd1495b191f08ff3e42 Mon Sep 17 00:00:00 2001 From: lolski Date: Thu, 3 Dec 2015 00:04:00 +0800 Subject: [PATCH 1/6] WIP - Base intersperse on GraphStage instead of StatefulStage. TODO: handle logic to add last element --- .../main/scala/akka/stream/impl/Stages.scala | 4 -- .../scala/akka/stream/impl/fusing/Ops.scala | 56 ++++++++----------- .../scala/akka/stream/scaladsl/Flow.scala | 6 +- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index e68ddcfe9f..823ef0689e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -188,10 +188,6 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[In, Out] = fusing.Scan(zero, f, supervision(attr)) } - final case class Intersperse[T](start: Option[T], inject: T, end: Option[T], attributes: Attributes = intersperse) extends SymbolicStage[T, T] { - override def create(attr: Attributes) = fusing.Intersperse(start, inject, end) - } - final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, attributes: Attributes = fold) extends SymbolicStage[In, Out] { override def create(attr: Attributes): Stage[In, Out] = fusing.Fold(zero, f, supervision(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 2dc1437da3..a84bc12e62 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -239,43 +239,35 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de /** * INTERNAL API */ -private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends StatefulStage[T, T] { - private var needsToEmitStart = start.isDefined +private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] { - override def initial: StageState[T, T] = - start match { - case Some(initial) ⇒ firstWithInitial(initial) - case _ ⇒ first - } + private val in = Inlet[T]("in") + private val out = Outlet[T]("out") - def firstWithInitial(initial: T) = new StageState[T, T] { - override def onPush(elem: T, ctx: Context[T]) = { - needsToEmitStart = false - emit(Iterator(initial, elem), ctx, running) - } - } + override val shape = FlowShape(in, out) - def first = new StageState[T, T] { - override def onPush(elem: T, ctx: Context[T]) = { - become(running) - ctx.push(elem) - } - } + override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + var s_ = start.isDefined + var m_ = false + var e_ = end.isDefined - def running = new StageState[T, T] { - override def onPush(elem: T, ctx: Context[T]): SyncDirective = - emit(Iterator(inject, elem), ctx) - } + setHandler(in, new InHandler { + override def onPush(): Unit = push(out, grab(in)) + }) - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - end match { - case Some(e) if needsToEmitStart ⇒ - terminationEmit(Iterator(start.get, end.get), ctx) - case Some(e) ⇒ - terminationEmit(Iterator(end.get), ctx) - case _ ⇒ - terminationEmit(Iterator(), ctx) - } + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (s_) { // emit start + push(out, start.get) + s_ = false + } else { // emit inject + if (m_) push(out, inject) + else pull(in) + + m_ = !m_ + } + } + }) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index dfdadb380d..ee9d29ef4d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,7 +9,7 @@ import akka.stream._ import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl._ -import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, MapAsync, MapAsyncUnordered, TakeWithin } +import akka.stream.impl.fusing._ import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } @@ -669,7 +669,7 @@ trait FlowOps[+Out, +Mat] { ReactiveStreamsCompliance.requireNonNullElement(start) ReactiveStreamsCompliance.requireNonNullElement(inject) ReactiveStreamsCompliance.requireNonNullElement(end) - andThen(Intersperse(Some(start), inject, Some(end))) + via(Intersperse(Some(start), inject, Some(end))) } /** @@ -696,7 +696,7 @@ trait FlowOps[+Out, +Mat] { */ def intersperse[T >: Out](inject: T): Repr[T, Mat] = { ReactiveStreamsCompliance.requireNonNullElement(inject) - andThen(Intersperse(None, inject, None)) + via(Intersperse(None, inject, None)) } /** From e2535935351972097f01684914350c95146deebc Mon Sep 17 00:00:00 2001 From: lolski Date: Sun, 6 Dec 2015 15:02:35 +0800 Subject: [PATCH 2/6] working GraphStage-based intersperse implementation --- .../scala/akka/stream/impl/fusing/Ops.scala | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index a84bc12e62..6181c75f8a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -239,7 +239,7 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de /** * INTERNAL API */ -private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] { +final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] { private val in = Inlet[T]("in") private val out = Outlet[T]("out") @@ -247,26 +247,40 @@ private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: override val shape = FlowShape(in, out) override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - var s_ = start.isDefined - var m_ = false - var e_ = end.isDefined + var first = true // is this the first element? setHandler(in, new InHandler { - override def onPush(): Unit = push(out, grab(in)) + override def onPush(): Unit = { + if (first) { + first = false + // emit: start if defined, elem + if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in))) + else emit(out, grab(in)) + } else { + // emit: inject, elem + emitMultipe(out, Iterator(inject, grab(in))) + } + } + + override def onUpstreamFinish(): Unit = { + if (first) { + // this branch is only called if upstream is an empty one + // emit: start if defined, end if defined, otherwise emit nothing + if (start.isDefined && end.isDefined) emitMultipe(out, Iterator(start.get, end.get)) + else if (start.isDefined && !end.isDefined) emit(out, start.get) + else if (!start.isDefined && end.isDefined) emit(out, end.get) + else { /* emit nothing */ } + } else { + // emit: end if defined, otherwise emit nothing + if (end.isDefined) emit(out, end.get) + else { /* emit nothing */ } + } + completeStage() + } }) setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (s_) { // emit start - push(out, start.get) - s_ = false - } else { // emit inject - if (m_) push(out, inject) - else pull(in) - - m_ = !m_ - } - } + override def onPull(): Unit = pull(in) }) } } From fca478c763a68391dce6f4f48c000a1418260d36 Mon Sep 17 00:00:00 2001 From: lolski Date: Mon, 7 Dec 2015 22:48:57 +0800 Subject: [PATCH 3/6] code restructure to enhance readability --- .../scala/akka/stream/impl/fusing/Ops.scala | 48 ++++++++----------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6181c75f8a..6028a1c91c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -247,41 +247,35 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext override val shape = FlowShape(in, out) override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - var first = true // is this the first element? - - setHandler(in, new InHandler { + val startInHandler = new InHandler { override def onPush(): Unit = { - if (first) { - first = false - // emit: start if defined, elem - if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in))) - else emit(out, grab(in)) - } else { - // emit: inject, elem - emitMultipe(out, Iterator(inject, grab(in))) - } + // if else (to avoid using Iterator[T].flatten in hot code) + if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in))) + else emit(out, grab(in)) + setHandler(in, restInHandler) // switch handler } override def onUpstreamFinish(): Unit = { - if (first) { - // this branch is only called if upstream is an empty one - // emit: start if defined, end if defined, otherwise emit nothing - if (start.isDefined && end.isDefined) emitMultipe(out, Iterator(start.get, end.get)) - else if (start.isDefined && !end.isDefined) emit(out, start.get) - else if (!start.isDefined && end.isDefined) emit(out, end.get) - else { /* emit nothing */ } - } else { - // emit: end if defined, otherwise emit nothing - if (end.isDefined) emit(out, end.get) - else { /* emit nothing */ } - } + emitMultipe(out, Iterator(start, end).flatten) completeStage() } - }) + } - setHandler(out, new OutHandler { + val restInHandler = new InHandler { + override def onPush(): Unit = emitMultipe(out, Iterator(inject, grab(in))) + + override def onUpstreamFinish(): Unit = { + if (end.isDefined) emit(out, end.get) + completeStage() + } + } + + val outHandler = new OutHandler { override def onPull(): Unit = pull(in) - }) + } + + setHandler(in, startInHandler) + setHandler(out, outHandler) } } From 3152a6d15f055dcb1909cc725ac215ede8d85155 Mon Sep 17 00:00:00 2001 From: lolski Date: Wed, 9 Dec 2015 01:26:42 +0800 Subject: [PATCH 4/6] fixed typo call to emitMultiple --- .../src/main/scala/akka/stream/impl/fusing/Ops.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6028a1c91c..8820bc6a1d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -250,19 +250,19 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext val startInHandler = new InHandler { override def onPush(): Unit = { // if else (to avoid using Iterator[T].flatten in hot code) - if (start.isDefined) emitMultipe(out, Iterator(start.get, grab(in))) + if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in))) else emit(out, grab(in)) setHandler(in, restInHandler) // switch handler } override def onUpstreamFinish(): Unit = { - emitMultipe(out, Iterator(start, end).flatten) + emitMultiple(out, Iterator(start, end).flatten) completeStage() } } val restInHandler = new InHandler { - override def onPush(): Unit = emitMultipe(out, Iterator(inject, grab(in))) + override def onPush(): Unit = emitMultiple(out, Iterator(inject, grab(in))) override def onUpstreamFinish(): Unit = { if (end.isDefined) emit(out, end.get) From 967c17e7d1dd90637a39401c81bdfda50c5b4b04 Mon Sep 17 00:00:00 2001 From: lolski Date: Fri, 11 Dec 2015 23:57:56 +0800 Subject: [PATCH 5/6] adding stage completion test to intersperse --- .../stream/scaladsl/FlowIntersperseSpec.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala index 0f1360692e..fa25b1644f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala @@ -4,7 +4,8 @@ package akka.stream.scaladsl import akka.stream.testkit._ -import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.Utils.assertAllStagesStopped +import akka.stream.testkit.scaladsl.{ TestSource, TestSink } import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import org.scalatest.concurrent.ScalaFutures @@ -18,7 +19,6 @@ class FlowIntersperseSpec extends AkkaSpec with ScalaFutures { implicit val materializer = ActorMaterializer(settings) "A Intersperse" must { - "inject element between existing elements" in { val probe = Source(List(1, 2, 3)) .map(_.toString) @@ -74,6 +74,29 @@ class FlowIntersperseSpec extends AkkaSpec with ScalaFutures { probe.expectSubscription() probe.toStrict(1.second).mkString("") should ===(List(1).mkString("[", ",", "]")) } + + "complete the stage when the Source has been completed" in { + val (p1, p2) = TestSource.probe[String].intersperse(",").toMat(TestSink.probe[String])(Keep.both).run + p2.request(10) + p1.sendNext("a") + .sendNext("b") + .sendComplete() + p2.expectNext("a") + .expectNext(",") + .expectNext("b") + .expectComplete() + } + + "complete the stage when the Sink has been cancelled" in { + val (p1, p2) = TestSource.probe[String].intersperse(",").toMat(TestSink.probe[String])(Keep.both).run + p2.request(10) + p1.sendNext("a") + .sendNext("b") + p2.expectNext("a") + .expectNext(",") + .cancel() + p1.expectCancellation() + } } } From 87b94202a3a1dbabad7b90d69c1ce53513f820c5 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Sat, 24 Oct 2015 00:07:51 -0400 Subject: [PATCH 6/6] +str #17967 add Sink.actorRefWithAck --- .../ActorRefBackpressureSinkSpec.scala | 115 ++++++++++++++++++ .../impl/ActorRefBackpressureSinkStage.scala | 93 ++++++++++++++ .../main/scala/akka/stream/javadsl/Sink.scala | 18 +++ .../scala/akka/stream/scaladsl/Sink.scala | 20 ++- .../scala/akka/stream/stage/GraphStage.scala | 18 ++- 5 files changed, 252 insertions(+), 12 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala new file mode 100644 index 0000000000..0df9f951d7 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.actor.{ Actor, ActorRef, Props } +import akka.stream.ActorMaterializer +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl._ + +object ActorRefBackpressureSinkSpec { + val initMessage = "start" + val completeMessage = "done" + val ackMessage = "ack" + + class Fw(ref: ActorRef) extends Actor { + def receive = { + case `initMessage` ⇒ + sender() ! ackMessage + ref forward initMessage + case `completeMessage` ⇒ + ref forward completeMessage + case msg: Int ⇒ + sender() ! ackMessage + ref forward msg + } + } + + case object TriggerAckMessage + + class Fw2(ref: ActorRef) extends Actor { + var actorRef: ActorRef = Actor.noSender + + def receive = { + case TriggerAckMessage ⇒ + actorRef ! ackMessage + case msg ⇒ + actorRef = sender() + ref forward msg + } + } + +} + +class ActorRefBackpressureSinkSpec extends AkkaSpec { + import ActorRefBackpressureSinkSpec._ + implicit val mat = ActorMaterializer() + + def createActor[T](c: Class[T]) = + system.actorOf(Props(c, testActor).withDispatcher("akka.test.stream-dispatcher")) + + "An ActorRefSink" must { + + "send the elements to the ActorRef" in assertAllStagesStopped { + val fw = createActor(classOf[Fw]) + Source(List(1, 2, 3)).runWith(Sink.actorRefWithAck(fw, + initMessage, ackMessage, completeMessage)) + expectMsg("start") + expectMsg(1) + expectMsg(2) + expectMsg(3) + expectMsg(completeMessage) + } + + "send the elements to the ActorRef2" in assertAllStagesStopped { + val fw = createActor(classOf[Fw]) + val probe = TestSource.probe[Int].to(Sink.actorRefWithAck(fw, + initMessage, ackMessage, completeMessage)).run() + probe.sendNext(1) + expectMsg("start") + expectMsg(1) + probe.sendNext(2) + expectMsg(2) + probe.sendNext(3) + expectMsg(3) + probe.sendComplete() + expectMsg(completeMessage) + } + + "cancel stream when actor terminates" in assertAllStagesStopped { + val fw = createActor(classOf[Fw]) + val publisher = TestSource.probe[Int].to(Sink.actorRefWithAck(fw, + initMessage, ackMessage, completeMessage)).run().sendNext(1) + expectMsg(initMessage) + expectMsg(1) + system.stop(fw) + publisher.expectCancellation() + } + + "send message only when backpressure received" in assertAllStagesStopped { + val fw = createActor(classOf[Fw2]) + val publisher = TestSource.probe[Int].to(Sink.actorRefWithAck(fw, + initMessage, ackMessage, completeMessage)).run() + expectMsg(initMessage) + + publisher.sendNext(1) + expectNoMsg() + fw ! TriggerAckMessage + expectMsg(1) + + publisher.sendNext(2) + publisher.sendNext(3) + publisher.sendComplete() + fw ! TriggerAckMessage + expectMsg(2) + fw ! TriggerAckMessage + expectMsg(3) + + expectMsg(completeMessage) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala new file mode 100644 index 0000000000..b75c97343e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import java.util + +import akka.actor._ +import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Watch } +import akka.stream.stage.GraphStageLogic.StageActorRef +import akka.stream.{ Inlet, SinkShape, ActorMaterializer, Attributes } +import akka.stream.Attributes.InputBuffer +import akka.stream.stage._ + +/** + * INTERNAL API + */ +private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any, + ackMessage: Any, + onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any) + extends GraphStage[SinkShape[In]] { + val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") + override val shape: SinkShape[In] = SinkShape(in) + + val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) { + implicit var self: StageActorRef = _ + + val buffer: util.Deque[In] = new util.ArrayDeque[In]() + var acknowledgementReceived = false + var completeReceived = false + + override def keepGoingAfterAllPortsClosed: Boolean = true + + private val callback: AsyncCallback[Unit] = getAsyncCallback((_: Unit) ⇒ { + if (!buffer.isEmpty) sendData() + else acknowledgementReceived = true + }) + + private val deathWatchCallback: AsyncCallback[Unit] = + getAsyncCallback((Unit) ⇒ completeStage()) + + private def receive(evt: (ActorRef, Any)): Unit = { + evt._2 match { + case `ackMessage` ⇒ callback.invoke(()) + case Terminated(`ref`) ⇒ deathWatchCallback.invoke(()) + case _ ⇒ //ignore all other messages + } + } + + override def preStart() = { + self = getStageActorRef(receive) + self.watch(ref) + ref ! onInitMessage + pull(in) + } + + private def sendData(): Unit = { + if (!buffer.isEmpty) { + ref ! buffer.poll() + acknowledgementReceived = false + } + if (buffer.isEmpty && completeReceived) finish() + } + + private def finish(): Unit = { + ref ! onCompleteMessage + completeStage() + } + + setHandler(in, new InHandler { + override def onPush(): Unit = { + buffer offer grab(in) + if (acknowledgementReceived) sendData() + if (buffer.size() < maxBuffer) pull(in) + } + override def onUpstreamFinish(): Unit = { + if (buffer.isEmpty) finish() + else completeReceived = true + } + override def onUpstreamFailure(ex: Throwable): Unit = { + ref ! onFailureMessage(ex) + failStage(ex) + } + }) + } + + override def toString = "ActorRefBackpressureSink" +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 0c40e382ac..b190a514a8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -8,6 +8,7 @@ import java.io.{ InputStream, OutputStream, File } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts import akka.japi.function +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout import akka.stream.{ javadsl, scaladsl, _ } import akka.util.ByteString @@ -153,6 +154,23 @@ object Sink { def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, Unit] = new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage)) + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * message will be sent to the destination actor. + */ + def actorRefWithAck[In](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, + onFailureMessage: function.Function[Throwable, Any]): Sink[In, Unit] = + new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply)) + /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 799db83ecc..03e98123c3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -4,9 +4,8 @@ package akka.stream.scaladsl import java.io.{ InputStream, OutputStream, File } - -import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts +import akka.actor.{ Status, ActorRef, Props } import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module @@ -239,6 +238,23 @@ object Sink { def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] = new Sink(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink"))) + /** + * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. + * First element is always `onInitMessage`, then stream is waiting for acknowledgement message + * `ackMessage` from the given actor which means that it is ready to process + * elements. It also requires `ackMessage` message after each stream element + * to make backpressure work. + * + * If the target actor terminates the stream will be canceled. + * When the stream is completed successfully the given `onCompleteMessage` + * will be sent to the destination actor. + * When the stream is completed with failure - result of `onFailureMessage(throwable)` + * function will be sent to the destination actor. + */ + def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, Unit] = + Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)) + /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 86b134f4ee..2925a62ba6 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -3,23 +3,21 @@ */ package akka.stream.stage -import java.util -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicReference } +import java.util.concurrent.atomic.AtomicReference import akka.actor._ -import akka.actor.dungeon.DeathWatch -import akka.dispatch.sysmsg.{ Unwatch, Watch, DeathWatchNotification, SystemMessage } -import akka.event.{ LoggingAdapter, Logging } -import akka.event.Logging.{ Warning, Debug } +import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch } +import akka.event.LoggingAdapter import akka.stream._ -import akka.stream.impl.{ SeqActorName, ActorMaterializerImpl, ReactiveStreamsCompliance } import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.fusing.{ GraphModule, GraphInterpreter } import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly +import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule } +import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName } + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration -import scala.collection.mutable.ArrayBuffer -import scala.annotation.tailrec abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {