+str #17967 add Sink.actorRefWithAck

This commit is contained in:
Alexander Golubev 2015-10-24 00:07:51 -04:00
parent 15cc65ce9d
commit 87b94202a3
5 changed files with 252 additions and 12 deletions

View file

@ -0,0 +1,115 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.ActorMaterializer
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
object ActorRefBackpressureSinkSpec {
val initMessage = "start"
val completeMessage = "done"
val ackMessage = "ack"
class Fw(ref: ActorRef) extends Actor {
def receive = {
case `initMessage`
sender() ! ackMessage
ref forward initMessage
case `completeMessage`
ref forward completeMessage
case msg: Int
sender() ! ackMessage
ref forward msg
}
}
case object TriggerAckMessage
class Fw2(ref: ActorRef) extends Actor {
var actorRef: ActorRef = Actor.noSender
def receive = {
case TriggerAckMessage
actorRef ! ackMessage
case msg
actorRef = sender()
ref forward msg
}
}
}
class ActorRefBackpressureSinkSpec extends AkkaSpec {
import ActorRefBackpressureSinkSpec._
implicit val mat = ActorMaterializer()
def createActor[T](c: Class[T]) =
system.actorOf(Props(c, testActor).withDispatcher("akka.test.stream-dispatcher"))
"An ActorRefSink" must {
"send the elements to the ActorRef" in assertAllStagesStopped {
val fw = createActor(classOf[Fw])
Source(List(1, 2, 3)).runWith(Sink.actorRefWithAck(fw,
initMessage, ackMessage, completeMessage))
expectMsg("start")
expectMsg(1)
expectMsg(2)
expectMsg(3)
expectMsg(completeMessage)
}
"send the elements to the ActorRef2" in assertAllStagesStopped {
val fw = createActor(classOf[Fw])
val probe = TestSource.probe[Int].to(Sink.actorRefWithAck(fw,
initMessage, ackMessage, completeMessage)).run()
probe.sendNext(1)
expectMsg("start")
expectMsg(1)
probe.sendNext(2)
expectMsg(2)
probe.sendNext(3)
expectMsg(3)
probe.sendComplete()
expectMsg(completeMessage)
}
"cancel stream when actor terminates" in assertAllStagesStopped {
val fw = createActor(classOf[Fw])
val publisher = TestSource.probe[Int].to(Sink.actorRefWithAck(fw,
initMessage, ackMessage, completeMessage)).run().sendNext(1)
expectMsg(initMessage)
expectMsg(1)
system.stop(fw)
publisher.expectCancellation()
}
"send message only when backpressure received" in assertAllStagesStopped {
val fw = createActor(classOf[Fw2])
val publisher = TestSource.probe[Int].to(Sink.actorRefWithAck(fw,
initMessage, ackMessage, completeMessage)).run()
expectMsg(initMessage)
publisher.sendNext(1)
expectNoMsg()
fw ! TriggerAckMessage
expectMsg(1)
publisher.sendNext(2)
publisher.sendNext(3)
publisher.sendComplete()
fw ! TriggerAckMessage
expectMsg(2)
fw ! TriggerAckMessage
expectMsg(3)
expectMsg(completeMessage)
}
}
}

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.util
import akka.actor._
import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Watch }
import akka.stream.stage.GraphStageLogic.StageActorRef
import akka.stream.{ Inlet, SinkShape, ActorMaterializer, Attributes }
import akka.stream.Attributes.InputBuffer
import akka.stream.stage._
/**
* INTERNAL API
*/
private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: (Throwable) Any)
extends GraphStage[SinkShape[In]] {
val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
override val shape: SinkShape[In] = SinkShape(in)
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
implicit var self: StageActorRef = _
val buffer: util.Deque[In] = new util.ArrayDeque[In]()
var acknowledgementReceived = false
var completeReceived = false
override def keepGoingAfterAllPortsClosed: Boolean = true
private val callback: AsyncCallback[Unit] = getAsyncCallback((_: Unit) {
if (!buffer.isEmpty) sendData()
else acknowledgementReceived = true
})
private val deathWatchCallback: AsyncCallback[Unit] =
getAsyncCallback((Unit) completeStage())
private def receive(evt: (ActorRef, Any)): Unit = {
evt._2 match {
case `ackMessage` callback.invoke(())
case Terminated(`ref`) deathWatchCallback.invoke(())
case _ //ignore all other messages
}
}
override def preStart() = {
self = getStageActorRef(receive)
self.watch(ref)
ref ! onInitMessage
pull(in)
}
private def sendData(): Unit = {
if (!buffer.isEmpty) {
ref ! buffer.poll()
acknowledgementReceived = false
}
if (buffer.isEmpty && completeReceived) finish()
}
private def finish(): Unit = {
ref ! onCompleteMessage
completeStage()
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
buffer offer grab(in)
if (acknowledgementReceived) sendData()
if (buffer.size() < maxBuffer) pull(in)
}
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) finish()
else completeReceived = true
}
override def onUpstreamFailure(ex: Throwable): Unit = {
ref ! onFailureMessage(ex)
failStage(ex)
}
})
}
override def toString = "ActorRefBackpressureSink"
}

View file

@ -8,6 +8,7 @@ import java.io.{ InputStream, OutputStream, File }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.japi.function
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout
import akka.stream.{ javadsl, scaladsl, _ }
import akka.util.ByteString
@ -153,6 +154,23 @@ object Sink {
def actorRef[In](ref: ActorRef, onCompleteMessage: Any): Sink[In, Unit] =
new Sink(scaladsl.Sink.actorRef[In](ref, onCompleteMessage))
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* message will be sent to the destination actor.
*/
def actorRefWithAck[In](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: function.Function[Throwable, Any]): Sink[In, Unit] =
new Sink(scaladsl.Sink.actorRefWithAck[In](ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage.apply))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should

View file

@ -4,9 +4,8 @@
package akka.stream.scaladsl
import java.io.{ InputStream, OutputStream, File }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.actor.{ Status, ActorRef, Props }
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
@ -239,6 +238,23 @@ object Sink {
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] =
new Sink(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) Any = Status.Failure): Sink[T, Unit] =
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must

View file

@ -3,23 +3,21 @@
*/
package akka.stream.stage
import java.util
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicReference }
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
import akka.actor.dungeon.DeathWatch
import akka.dispatch.sysmsg.{ Unwatch, Watch, DeathWatchNotification, SystemMessage }
import akka.event.{ LoggingAdapter, Logging }
import akka.event.Logging.{ Warning, Debug }
import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch }
import akka.event.LoggingAdapter
import akka.stream._
import akka.stream.impl.{ SeqActorName, ActorMaterializerImpl, ReactiveStreamsCompliance }
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.{ GraphModule, GraphInterpreter }
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule }
import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName }
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration
import scala.collection.mutable.ArrayBuffer
import scala.annotation.tailrec
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {