From 1296f9986f54a97838bb837852530458abdd4326 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 12 May 2016 13:19:57 +0200 Subject: [PATCH] make control message ingress buffer bounded --- .../src/main/scala/akka/remote/artery/Control.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 56127d956c..a257060ce3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -155,6 +155,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext) import OutboundControlJunction._ private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) + private val maxControlMessageBufferSize: Int = 1024 // FIXME config private val buffer = new ArrayDeque[Send] override def preStart(): Unit = { @@ -180,8 +181,13 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext) private def internalSendControlMessage(message: ControlMessage): Unit = { if (buffer.isEmpty && isAvailable(out)) push(out, wrap(message)) - else + else if (buffer.size < maxControlMessageBufferSize) buffer.offer(wrap(message)) + else { + // it's alright to drop control messages + // FIXME we need that stage logging support + println(s"dropping control message ${message.getClass.getName} due to full buffer") + } } private def wrap(message: ControlMessage): Send =