diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 0df9f951d7..883eaa009c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -50,7 +50,7 @@ class ActorRefBackpressureSinkSpec extends AkkaSpec { def createActor[T](c: Class[T]) = system.actorOf(Props(c, testActor).withDispatcher("akka.test.stream-dispatcher")) - "An ActorRefSink" must { + "An ActorRefBackpressureSink" must { "send the elements to the ActorRef" in assertAllStagesStopped { val fw = createActor(classOf[Fw]) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index b75c97343e..9391ccc0e8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -36,18 +36,12 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa override def keepGoingAfterAllPortsClosed: Boolean = true - private val callback: AsyncCallback[Unit] = getAsyncCallback((_: Unit) ⇒ { - if (!buffer.isEmpty) sendData() - else acknowledgementReceived = true - }) - - private val deathWatchCallback: AsyncCallback[Unit] = - getAsyncCallback((Unit) ⇒ completeStage()) - private def receive(evt: (ActorRef, Any)): Unit = { evt._2 match { - case `ackMessage` ⇒ callback.invoke(()) - case Terminated(`ref`) ⇒ deathWatchCallback.invoke(()) + case `ackMessage` ⇒ + if (!buffer.isEmpty) sendData() + else acknowledgementReceived = true + case Terminated(`ref`) ⇒ completeStage() case _ ⇒ //ignore all other messages } }