Adds variant accepting any ack message (#29586) (#29910)

This commit is contained in:
Nicolas Vollmar 2020-12-22 09:46:38 +01:00 committed by GitHub
parent f68f0cd805
commit 40713f836f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 189 additions and 17 deletions

View file

@ -23,6 +23,7 @@ This operator is included in:
## Description
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.
There is also a variant without a concrete acknowledge message accepting any message as such.
See also:

View file

@ -12,6 +12,7 @@ Send the elements from the stream to an `ActorRef` (of the classic actors API) w
Send the elements from the stream to an `ActorRef` which must then acknowledge reception after completing a message,
to provide back pressure onto the sink.
There is also a variant without a concrete acknowledge message accepting any message as such.
See also:

View file

@ -124,6 +124,30 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
expectMsg(completeMessage)
}
"send message only when backpressure received with any ack message" in assertAllStagesStopped {
val fw = createActor(classOf[Fw2])
val publisher = TestSource
.probe[Int]
.to(Sink.actorRefWithBackpressure(fw, initMessage, completeMessage, _ => failMessage))
.run()
expectMsg(initMessage)
publisher.sendNext(1)
expectNoMessage(200.millis)
fw ! TriggerAckMessage
expectMsg(1)
publisher.sendNext(2)
publisher.sendNext(3)
publisher.sendComplete()
fw ! TriggerAckMessage
expectMsg(2)
fw ! TriggerAckMessage
expectMsg(3)
expectMsg(completeMessage)
}
"keep on sending even after the buffer has been full" in assertAllStagesStopped {
val bufferSize = 16
val streamElementCount = bufferSize + 4

View file

@ -73,4 +73,37 @@ object ActorSink {
onFailureMessage.apply)
.asJava
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* from the given actor which means that it is ready to process
* elements. It also requires an ack message after each stream element
* to make backpressure work. This variant will consider any message as ack message.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*
* @param ref the receiving actor as `ActorRef<T>` (where `T` must include the control messages below)
* @param messageAdapter a function that wraps the stream elements to be sent to the actor together with an `ActorRef[A]` which accepts the ack message
* @param onInitMessage a function that wraps an `ActorRef<A>` into a messages to couple the receiving actor to the sink
* @param onCompleteMessage the message to be sent to the actor when the stream completes
* @param onFailureMessage a function that creates a message to be sent to the actor in case the stream fails from a `Throwable`
*/
def actorRefWithBackpressure[T, M, A](
ref: ActorRef[M],
messageAdapter: akka.japi.function.Function2[ActorRef[A], T, M],
onInitMessage: akka.japi.function.Function[ActorRef[A], M],
onCompleteMessage: M,
onFailureMessage: akka.japi.function.Function[Throwable, M]): Sink[T, NotUsed] =
typed.scaladsl.ActorSink
.actorRefWithBackpressure(
ref,
messageAdapter.apply,
onInitMessage.apply,
onCompleteMessage,
onFailureMessage.apply)
.asJava
}

View file

@ -64,7 +64,40 @@ object ActorSink {
ref.toClassic,
messageAdapter.curried.compose(actorRefAdapter),
onInitMessage.compose(actorRefAdapter),
ackMessage,
Some(ackMessage),
onCompleteMessage,
onFailureMessage)
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signals.
* The first element is always `onInitMessage`, then stream is waiting for acknowledgement message
* from the given actor which means that it is ready to process elements.
* It also requires an ack message after each stream element
* to make backpressure work. This variant will consider any message as ack message.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*
* @param ref the receiving actor as `ActorRef[T]` (where `T` must include the control messages below)
* @param messageAdapter a function that wraps the stream elements to be sent to the actor together with an `ActorRef[A]` which accepts the ack message
* @param onInitMessage a function that wraps an `ActorRef[A]` into a messages to couple the receiving actor to the sink
* @param onCompleteMessage the message to be sent to the actor when the stream completes
* @param onFailureMessage a function that creates a message to be sent to the actor in case the stream fails from a `Throwable`
*/
def actorRefWithBackpressure[T, M, A](
ref: ActorRef[M],
messageAdapter: (ActorRef[A], T) => M,
onInitMessage: ActorRef[A] => M,
onCompleteMessage: M,
onFailureMessage: Throwable => M): Sink[T, NotUsed] =
Sink.actorRefWithAck(
ref.toClassic,
messageAdapter.curried.compose(actorRefAdapter),
onInitMessage.compose(actorRefAdapter),
None,
onCompleteMessage,
onFailureMessage)

View file

@ -80,6 +80,43 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike
in.offer("Swobu!")
p.expectMessageType[Msg].msg shouldBe "Swobu!"
}
"obey protocol without specific ack message" in {
val p = TestProbe[AckProto]()
val autoPilot = Behaviors.receiveMessage[AckProto] {
case m @ Init(sender) =>
p.ref ! m
sender ! "ACK"
Behaviors.same
case m @ Msg(sender, _) =>
p.ref ! m
sender ! "ACK"
Behaviors.same
case m =>
p.ref ! m
Behaviors.same
}
val pilotRef: ActorRef[AckProto] = spawn(autoPilot)
val in =
Source
.queue[String](10, OverflowStrategy.dropBuffer)
.to(ActorSink.actorRefWithBackpressure(pilotRef, Msg.apply, Init.apply, Complete, _ => Failed))
.run()
p.expectMessageType[Init]
in.offer("Dabu!")
p.expectMessageType[Msg].msg shouldBe "Dabu!"
in.offer("Lok'tar!")
p.expectMessageType[Msg].msg shouldBe "Lok'tar!"
in.offer("Swobu!")
p.expectMessageType[Msg].msg shouldBe "Swobu!"
}
}
"ActorSource" should {

View file

@ -0,0 +1,2 @@
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.scaladsl.Sink.actorRefWithAck")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.ActorRefBackpressureSinkStage.this")

View file

@ -5,7 +5,6 @@
package akka.stream.impl
import java.util
import akka.actor._
import akka.annotation.InternalApi
import akka.stream._
@ -20,29 +19,30 @@ import akka.stream.stage._
ref: ActorRef,
messageAdapter: ActorRef => In => Any,
onInitMessage: ActorRef => Any,
ackMessage: Any,
ackMessage: Option[Any],
onCompleteMessage: Any,
onFailureMessage: (Throwable) => Any)
extends GraphStage[SinkShape[In]] {
val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
override def initialAttributes = DefaultAttributes.actorRefWithBackpressureSink
override def initialAttributes: Attributes = DefaultAttributes.actorRefWithBackpressureSink
override val shape: SinkShape[In] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
implicit def self: ActorRef = stageActor.ref
val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
private val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
val buffer: util.Deque[In] = new util.ArrayDeque[In]()
var acknowledgementReceived = false
var completeReceived = false
var completionSignalled = false
private val buffer: util.Deque[In] = new util.ArrayDeque[In]()
private var acknowledgementReceived = false
private var completeReceived = false
private var completionSignalled = false
private def receive(evt: (ActorRef, Any)): Unit = {
evt._2 match {
case `ackMessage` => {
case Terminated(`ref`) => completeStage()
case ackMsg if ackMessage.isEmpty || ackMessage.contains(ackMsg) =>
if (buffer.isEmpty) acknowledgementReceived = true
else {
// onPush might have filled the buffer up and
@ -50,13 +50,11 @@ import akka.stream.stage._
if (buffer.size() == maxBuffer) tryPull(in)
dequeueAndSend()
}
}
case Terminated(`ref`) => completeStage()
case _ => //ignore all other messages
case _ => //ignore all other messages
}
}
override def preStart() = {
override def preStart(): Unit = {
setKeepGoing(true)
getStageActor(receive).watch(ref)
ref ! onInitMessage(self)

View file

@ -259,6 +259,27 @@ object Sink {
scaladsl.Sink
.actorRefWithBackpressure[In](ref, onInitMessage, ackMessage, onCompleteMessage, t => onFailureMessage(t)))
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* from the given actor which means that it is ready to process
* elements. It also requires an ack message after each stream element
* to make backpressure work. This variant will consider any message as ack message.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* message will be sent to the destination actor.
*/
def actorRefWithBackpressure[In](
ref: ActorRef,
onInitMessage: Any,
onCompleteMessage: Any,
onFailureMessage: function.Function[Throwable, Any]): Sink[In, NotUsed] =
new Sink(
scaladsl.Sink.actorRefWithBackpressure[In](ref, onInitMessage, onCompleteMessage, t => onFailureMessage(t)))
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message

View file

@ -484,6 +484,7 @@ object Sink {
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
* If `ackMessage` is empty any message will be considered an acknowledgement message.
*
* Every message that is sent to the actor is first transformed using `messageAdapter`.
* This can be used to capture the ActorRef of the actor that expects acknowledgments as
@ -499,7 +500,7 @@ object Sink {
ref: ActorRef,
messageAdapter: ActorRef => T => Any,
onInitMessage: ActorRef => Any,
ackMessage: Any,
ackMessage: Option[Any],
onCompleteMessage: Any,
onFailureMessage: (Throwable) => Any): Sink[T, NotUsed] =
Sink.fromGraph(
@ -530,7 +531,27 @@ object Sink {
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: Throwable => Any): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
actorRefWithAck(ref, _ => identity, _ => onInitMessage, Some(ackMessage), onCompleteMessage, onFailureMessage)
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* from the given actor which means that it is ready to process
* elements. It also requires an ack message after each stream element
* to make backpressure work. This variant will consider any message as ack message.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
def actorRefWithBackpressure[T](
ref: ActorRef,
onInitMessage: Any,
onCompleteMessage: Any,
onFailureMessage: Throwable => Any): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, None, onCompleteMessage, onFailureMessage)
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
@ -552,7 +573,7 @@ object Sink {
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
actorRefWithAck(ref, _ => identity, _ => onInitMessage, Some(ackMessage), onCompleteMessage, onFailureMessage)
/**
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueueWithCancel]].

View file

@ -12,6 +12,7 @@ enablePlugins(
JavaFormatterPlugin)
disablePlugins(MimaPlugin)
// check format and headers
TaskKey[Unit]("verifyCodeFmt") := {
javafmtCheckAll.all(ScopeFilter(inAnyProject)).result.value.toEither.left.foreach { _ =>