Merge pull request #19163 from agolubev/agolubev-#17967-Sink.actorRef-with-Acking
Agolubev #17967 sink.actor ref with acking 2
This commit is contained in:
commit
e4d4fe34bf
2 changed files with 5 additions and 11 deletions
|
|
@ -50,7 +50,7 @@ class ActorRefBackpressureSinkSpec extends AkkaSpec {
|
|||
def createActor[T](c: Class[T]) =
|
||||
system.actorOf(Props(c, testActor).withDispatcher("akka.test.stream-dispatcher"))
|
||||
|
||||
"An ActorRefSink" must {
|
||||
"An ActorRefBackpressureSink" must {
|
||||
|
||||
"send the elements to the ActorRef" in assertAllStagesStopped {
|
||||
val fw = createActor(classOf[Fw])
|
||||
|
|
|
|||
|
|
@ -36,18 +36,12 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa
|
|||
|
||||
override def keepGoingAfterAllPortsClosed: Boolean = true
|
||||
|
||||
private val callback: AsyncCallback[Unit] = getAsyncCallback((_: Unit) ⇒ {
|
||||
if (!buffer.isEmpty) sendData()
|
||||
else acknowledgementReceived = true
|
||||
})
|
||||
|
||||
private val deathWatchCallback: AsyncCallback[Unit] =
|
||||
getAsyncCallback((Unit) ⇒ completeStage())
|
||||
|
||||
private def receive(evt: (ActorRef, Any)): Unit = {
|
||||
evt._2 match {
|
||||
case `ackMessage` ⇒ callback.invoke(())
|
||||
case Terminated(`ref`) ⇒ deathWatchCallback.invoke(())
|
||||
case `ackMessage` ⇒
|
||||
if (!buffer.isEmpty) sendData()
|
||||
else acknowledgementReceived = true
|
||||
case Terminated(`ref`) ⇒ completeStage()
|
||||
case _ ⇒ //ignore all other messages
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue