diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala index 9e2b35f836..d11435806b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSource.scala @@ -37,7 +37,8 @@ private object ActorRefBackpressureSource { private[akka] override def createLogicAndMaterializedValue( inheritedAttributes: Attributes, eagerMaterializer: Materializer): (GraphStageLogic, ActorRef) = { - val stage: GraphStageLogic with StageLogging with ActorRefStage = new GraphStageLogic(shape) + val stage: GraphStageLogic with OutHandler with StageLogging with ActorRefStage = new GraphStageLogic(shape) + with OutHandler with StageLogging with ActorRefStage { override protected def logSource: Class[_] = classOf[ActorRefSource[_]] @@ -84,11 +85,9 @@ private object ActorRefBackpressureSource { } } - setHandler(out, new OutHandler { - override def onPull(): Unit = { - tryPush() - } - }) + override def onPull(): Unit = tryPush() + + setHandler(out, this) } (stage, stage.ref)