diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index c7658d96dc..cfd46d77c6 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -14,7 +14,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.util.control.NonFatal import org.scalatest.matchers._ -import Tcp._ +import akka.io.Tcp._ import akka.io.SelectionHandler._ import TestUtils._ import akka.actor.{ ActorRef, PoisonPill, Terminated } @@ -169,6 +169,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") */ "stop writing in cases of backpressure and resume afterwards" in withEstablishedConnection(clientSocketOptions = List(SO.ReceiveBufferSize(1000000))) { setup ⇒ + info("Currently ignored as SO_SNDBUF is usually a lower bound on the send buffer so the test fails as no real " + + "backpressure present.") + pending ignoreIfWindows() import setup._ object Ack1 diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala index 270711cc76..6cb6042b2f 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala @@ -34,10 +34,8 @@ class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Implici val data = ByteString("To infinity and beyond!") simpleSender ! Send(data, serverAddress) - expectMsgPF() { - case Received(d, _) ⇒ - d must be === data - } + expectMsgType[Received].data must be === data + } "be able to send with binding" in { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index c17594a307..fd5051deb8 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -416,7 +416,7 @@ akka { # The maximal number of direct buffers kept in the direct buffer pool for # reuse. - max-direct-buffer-pool-size = 1000 + direct-buffer-pool-limit = 1000 # The duration a connection actor waits for a `Register` message from # its commander before aborting the connection. @@ -425,7 +425,7 @@ akka { # The maximum number of bytes delivered by a `Received` message. Before # more data is read from the network the connection actor will try to # do other work. - received-message-size-limit = unlimited + max-received-message-size = unlimited # Enable fine grained logging of what goes on inside the implementation. # Be aware that this may log more than once per message sent to the actors @@ -473,7 +473,7 @@ akka { # The maximum number of datagrams that are read in one go, # higher numbers decrease latency, lower numbers increase fairness on # the worker-dispatcher - batch-receive-limit = 3 + receive-throughput = 3 # The number of bytes per direct buffer in the pool used to read or write # network data from the kernel. @@ -481,7 +481,7 @@ akka { # The maximal number of direct buffers kept in the direct buffer pool for # reuse. - max-direct-buffer-pool-size = 1000 + direct-buffer-pool-limit = 1000 # The maximum number of bytes delivered by a `Received` message. Before # more data is read from the network the connection actor will try to @@ -534,7 +534,7 @@ akka { # The maximum number of datagrams that are read in one go, # higher numbers decrease latency, lower numbers increase fairness on # the worker-dispatcher - batch-receive-limit = 3 + receive-throughput = 3 # The number of bytes per direct buffer in the pool used to read or write # network data from the kernel. @@ -542,7 +542,7 @@ akka { # The maximal number of direct buffers kept in the direct buffer pool for # reuse. - max-direct-buffer-pool-size = 1000 + direct-buffer-pool-limit = 1000 # The maximum number of bytes delivered by a `Received` message. Before # more data is read from the network the connection actor will try to diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala index 5b10023990..e238ffbaf2 100644 --- a/akka-actor/src/main/scala/akka/io/IO.scala +++ b/akka-actor/src/main/scala/akka/io/IO.scala @@ -33,7 +33,7 @@ object IO { WorkerForCommand(cmd, commander, props) } - def workerForCommand(pf: PartialFunction[Any, Props]): Receive = { + def workerForCommandHandler(pf: PartialFunction[Any, Props]): Receive = { case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! createWorkerMessage(pf)(cmd) } } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 57c02f0d25..0d1c0d439e 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -94,15 +94,16 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler override def postStop() { try { - val iterator = selector.keys.iterator - while (iterator.hasNext) { - val key = iterator.next() - try key.channel.close() - catch { - case NonFatal(e) ⇒ log.error(e, "Error closing channel") + try { + val iterator = selector.keys.iterator + while (iterator.hasNext) { + val key = iterator.next() + try key.channel.close() + catch { + case NonFatal(e) ⇒ log.error(e, "Error closing channel") + } } - } - selector.close() + } finally selector.close() } catch { case NonFatal(e) ⇒ log.error(e, "Error closing selector") } @@ -112,11 +113,11 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler override def supervisorStrategy = SupervisorStrategy.stoppingStrategy def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: ⇒ Unit): Unit = { - log.debug("Executing {}", cmd) + log.debug("Executing [{}]", cmd) if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) { body } else { - log.warning("Rejecting '{}' with {} retries left, retrying...", cmd, retriesLeft) + log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft) context.parent forward Retry(cmd, retriesLeft - 1) } } @@ -198,9 +199,9 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler case OP_READ_AND_WRITE ⇒ connection ! ChannelWritable; connection ! ChannelReadable case x if (x & OP_ACCEPT) > 0 ⇒ connection ! ChannelAcceptable case x if (x & OP_CONNECT) > 0 ⇒ connection ! ChannelConnectable - case x ⇒ log.warning("Invalid readyOps: {}", x) + case x ⇒ log.warning("Invalid readyOps: [{}]", x) } - } else log.warning("Invalid selection key: {}", key) + } else log.warning("Invalid selection key: [{}]", key) } keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected } @@ -217,7 +218,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler try tryRun() catch { case _: java.nio.channels.ClosedSelectorException ⇒ // ok, expected during shutdown - case NonFatal(e) ⇒ log.error(e, "Error during selector management task: {}", e) + case NonFatal(e) ⇒ log.error(e, "Error during selector management task: [{}]", e) } } } diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 95218f16cf..97076477cf 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -16,7 +16,7 @@ import akka.actor._ object Tcp extends ExtensionKey[TcpExt] { // Java API - override def get(system: ActorSystem): TcpExt = system.extension(this) + override def get(system: ActorSystem): TcpExt = super.get(system) // shared socket options object SO extends Inet.SoForwarders { @@ -124,12 +124,12 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val BatchAcceptLimit = getInt("batch-accept-limit") val DirectBufferSize = getIntBytes("direct-buffer-size") - val MaxDirectBufferPoolSize = getInt("max-direct-buffer-pool-size") + val MaxDirectBufferPoolSize = getInt("direct-buffer-pool-limit") val RegisterTimeout = getString("register-timeout") match { case "infinite" ⇒ Duration.Undefined case x ⇒ Duration(x) } - val ReceivedMessageSizeLimit = getString("received-message-size-limit") match { + val ReceivedMessageSizeLimit = getString("max-received-message-size") match { case "unlimited" ⇒ Int.MaxValue case x ⇒ getIntBytes("received-message-size-limit") } @@ -150,7 +150,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { } } - val manager = { + val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( props = Props(new TcpManager(this)).withDispatcher(Settings.ManagementDispatcher), name = "IO-TCP") diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 44e7b29163..12e73bdaa1 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -40,7 +40,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, /** connection established, waiting for registration from user handler */ def waitingForRegistration(commander: ActorRef): Receive = { case Register(handler) ⇒ - if (TraceLogging) log.debug("{} registered as connection handler", handler) + if (TraceLogging) log.debug("[{}] registered as connection handler", handler) doRead(handler, None) // immediately try reading context.setReceiveTimeout(Duration.Undefined) @@ -54,7 +54,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, case ReceiveTimeout ⇒ // after sending `Register` user should watch this actor to make sure // it didn't die because of the timeout - log.warning("Configured registration timeout of {} expired, stopping", RegisterTimeout) + log.warning("Configured registration timeout of [{}] expired, stopping", RegisterTimeout) context.stop(self) } @@ -145,12 +145,12 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, if (TraceLogging) log.debug("Read nothing.") selector ! ReadInterest case GotCompleteData(data) ⇒ - if (TraceLogging) log.debug("Read {} bytes.", data.length) + if (TraceLogging) log.debug("Read [{}] bytes.", data.length) handler ! Received(data) selector ! ReadInterest case MoreDataWaiting(data) ⇒ - if (TraceLogging) log.debug("Read {} bytes. More data waiting.", data.length) + if (TraceLogging) log.debug("Read [{}] bytes. More data waiting.", data.length) handler ! Received(data) self ! ChannelReadable @@ -167,7 +167,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, val toWrite = pendingWrite.buffer.remaining() require(toWrite != 0) val writtenBytes = channel.write(pendingWrite.buffer) - if (TraceLogging) log.debug("Wrote {} bytes to channel", writtenBytes) + if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes) pendingWrite = pendingWrite.consume(writtenBytes) @@ -248,7 +248,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, case NonFatal(e) ⇒ // setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574 // (also affected: OS/X Java 1.6.0_37) - if (TraceLogging) log.debug("setSoLinger(true, 0) failed with {}", e) + if (TraceLogging) log.debug("setSoLinger(true, 0) failed with [{}]", e) } channel.close() } diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 51419fa0a5..d806fe8490 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -45,7 +45,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef, catch { case NonFatal(e) ⇒ bindCommander ! CommandFailed(bind) - log.error(e, "Bind failed for TCP channel") + log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint) context.stop(self) } serverSocketChannel diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index 032bcfd6bc..aa80e96c10 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -45,7 +45,7 @@ import akka.io.IO.SelectorBasedManager */ private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging { - def receive = workerForCommand { + def receive = workerForCommandHandler { case c: Connect ⇒ val commander = sender Props(new TcpOutgoingConnection(tcp, commander, c)) diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 39817efe99..03d978293e 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -31,7 +31,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, def receive: Receive = { case ChannelRegistered ⇒ - log.debug("Attempting connection to {}", remoteAddress) + log.debug("Attempting connection to [{}]", remoteAddress) if (channel.connect(remoteAddress)) completeConnect(commander, options) else { diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 83e8b0d5f1..840dda666d 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -28,8 +28,8 @@ object Udp { val NrOfSelectors = getInt("nr-of-selectors") val DirectBufferSize = getIntBytes("direct-buffer-size") - val MaxDirectBufferPoolSize = getInt("max-direct-buffer-pool-size") - val BatchReceiveLimit = getInt("batch-receive-limit") + val MaxDirectBufferPoolSize = getInt("direct-buffer-pool-limit") + val BatchReceiveLimit = getInt("receive-throughput") val ManagementDispatcher = getString("management-dispatcher") diff --git a/akka-actor/src/main/scala/akka/io/UdpConn.scala b/akka-actor/src/main/scala/akka/io/UdpConn.scala index df6715d2c5..aee429a716 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConn.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConn.scala @@ -12,7 +12,7 @@ import scala.collection.immutable object UdpConn extends ExtensionKey[UdpConnExt] { // Java API - override def get(system: ActorSystem): UdpConnExt = system.extension(this) + override def get(system: ActorSystem): UdpConnExt = super.get(system) trait Command extends IO.HasFailureMessage { def failureMessage = CommandFailed(this) @@ -51,9 +51,9 @@ object UdpConn extends ExtensionKey[UdpConnExt] { class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension { - val settings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) + val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) - val manager = { + val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( props = Props(new UdpConnManager(this)), name = "IO-UDP-CONN") diff --git a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala index a93a21259d..3868289c6b 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala @@ -9,7 +9,7 @@ import akka.io.UdpConn.Connect class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { - def receive = workerForCommand { + def receive = workerForCommandHandler { case c: Connect ⇒ val commander = sender Props(new UdpConnection(udpConn, commander, c)) diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index ff91a464fa..6d52adfb3b 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -33,18 +33,19 @@ private[io] class UdpConnection(val udpConn: UdpConnExt, val socket = datagramChannel.socket options.foreach(_.beforeDatagramBind(socket)) try { - localAddress.foreach { socket.bind _ } // will blow up the actor constructor if the bind fails + localAddress foreach socket.bind datagramChannel.connect(remoteAddress) } catch { case NonFatal(e) ⇒ - log.error(e, "Failure while connecting UDP channel") + log.error(e, "Failure while connecting UDP channel to remote address [{}] local address [{}]", + remoteAddress, localAddress.map { _.toString }.getOrElse("undefined")) commander ! CommandFailed(connect) context.stop(self) } datagramChannel } selector ! RegisterChannel(channel, OP_READ) - log.debug("Successfully connected to {}", remoteAddress) + log.debug("Successfully connected to [{}]", remoteAddress) def receive = { case ChannelRegistered ⇒ @@ -58,10 +59,10 @@ private[io] class UdpConnection(val udpConn: UdpConnExt, case ChannelReadable ⇒ doRead(handler) case Close ⇒ - log.debug("Closing UDP connection to {}", remoteAddress) + log.debug("Closing UDP connection to [{}]", remoteAddress) channel.close() sender ! Disconnected - log.debug("Connection closed to {}, stopping listener", remoteAddress) + log.debug("Connection closed to [{}], stopping listener", remoteAddress) context.stop(self) case send: Send if writePending ⇒ @@ -106,7 +107,7 @@ private[io] class UdpConnection(val udpConn: UdpConnExt, send.payload.copyToBuffer(buffer) buffer.flip() val writtenBytes = channel.write(buffer) - if (TraceLogging) log.debug("Wrote {} bytes to channel", writtenBytes) + if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes) // Datagram channel either sends the whole message, or nothing if (writtenBytes == 0) commander ! CommandFailed(send) diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index f6d86f7053..df935ebbaa 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -13,7 +13,7 @@ import scala.collection.immutable object UdpFF extends ExtensionKey[UdpFFExt] { // Java API - override def get(system: ActorSystem): UdpFFExt = system.extension(this) + override def get(system: ActorSystem): UdpFFExt = super.get(system) trait Command extends IO.HasFailureMessage { def failureMessage = CommandFailed(this) @@ -53,9 +53,9 @@ object UdpFF extends ExtensionKey[UdpFFExt] { class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension { - val settings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) + val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) - val manager = { + val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( props = Props(new UdpFFManager(this)), name = "IO-UDP-FF") diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala index 0edccefab3..add5775832 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala @@ -35,13 +35,13 @@ private[io] class UdpFFListener(val udpFF: UdpFFExt, catch { case NonFatal(e) ⇒ bindCommander ! CommandFailed(bind) - log.error(e, "Failed to bind UDP channel") + log.error(e, "Failed to bind UDP channel to endpoint [{}]", endpoint) context.stop(self) } datagramChannel } context.parent ! RegisterChannel(channel, OP_READ) - log.debug("Successfully bound to {}", endpoint) + log.debug("Successfully bound to [{}]", endpoint) def receive: Receive = { case ChannelRegistered ⇒ @@ -55,11 +55,12 @@ private[io] class UdpFFListener(val udpFF: UdpFFExt, case ChannelReadable ⇒ doReceive(handler) case Unbind ⇒ - log.debug("Unbinding endpoint {}", endpoint) - channel.close() - sender ! Unbound - log.debug("Unbound endpoint {}, stopping listener", endpoint) - context.stop(self) + log.debug("Unbinding endpoint [{}]", endpoint) + try { + channel.close() + sender ! Unbound + log.debug("Unbound endpoint [{}], stopping listener", endpoint) + } finally context.stop(self) } def doReceive(handler: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index 8e3e03617f..16d835ae49 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -44,7 +44,7 @@ import akka.io.UdpFF._ */ private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) { - def receive = workerForCommand { + def receive = workerForCommandHandler { case b: Bind ⇒ val commander = sender Props(new UdpFFListener(udpFF, commander, b)) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala index 5303aa79d9..1120efba33 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala @@ -9,6 +9,7 @@ import akka.io.UdpFF._ import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } import scala.collection.immutable import akka.io.Inet.SocketOption +import scala.util.control.NonFatal /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. @@ -23,9 +24,7 @@ private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversabl datagramChannel.configureBlocking(false) val socket = datagramChannel.socket - options foreach { o ⇒ - o.beforeDatagramBind(socket) - } + options foreach { _.beforeDatagramBind(socket) } datagramChannel } @@ -37,7 +36,13 @@ private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversabl commander ! SimpleSendReady } - override def postStop(): Unit = if (channel.isOpen) channel.close() + override def postStop(): Unit = if (channel.isOpen) { + log.debug("Closing DatagramChannel after being stopped") + try channel.close() + catch { + case NonFatal(e) ⇒ log.error(e, "Error closing DatagramChannel") + } + } override def postRestart(reason: Throwable): Unit = throw new IllegalStateException("Restarting not supported for connection actors.") diff --git a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala index 09dd37666c..99ed9393e2 100644 --- a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala +++ b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala @@ -11,11 +11,12 @@ import java.nio.channels.DatagramChannel private[io] trait WithUdpFFSend { me: Actor with ActorLogging ⇒ - var pendingSend: (Send, ActorRef) = null + var pendingSend: Send = null + var pendingCommander: ActorRef = null // If send fails first, we allow a second go after selected writable, but no more. This flag signals that // pending send was already tried once. var retriedSend = false - def writePending = pendingSend ne null + def hasWritePending = pendingSend ne null def selector: ActorRef def channel: DatagramChannel @@ -26,7 +27,7 @@ private[io] trait WithUdpFFSend { def sendHandlers: Receive = { - case send: Send if writePending ⇒ + case send: Send if hasWritePending ⇒ if (TraceLogging) log.debug("Dropping write because queue is full") sender ! CommandFailed(send) @@ -35,10 +36,11 @@ private[io] trait WithUdpFFSend { sender ! send.ack case send: Send ⇒ - pendingSend = (send, sender) + pendingSend = send + pendingCommander = sender doSend() - case ChannelWritable ⇒ doSend() + case ChannelWritable ⇒ if (hasWritePending) doSend() } @@ -46,24 +48,24 @@ private[io] trait WithUdpFFSend { val buffer = udpFF.bufferPool.acquire() try { - val (send, commander) = pendingSend buffer.clear() - send.payload.copyToBuffer(buffer) + pendingSend.payload.copyToBuffer(buffer) buffer.flip() - val writtenBytes = channel.send(buffer, send.target) - if (TraceLogging) log.debug("Wrote {} bytes to channel", writtenBytes) + val writtenBytes = channel.send(buffer, pendingSend.target) + if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes) // Datagram channel either sends the whole message, or nothing if (writtenBytes == 0) { if (retriedSend) { - commander ! CommandFailed(send) + pendingCommander ! CommandFailed(pendingSend) retriedSend = false pendingSend = null + pendingCommander = null } else { selector ! WriteInterest retriedSend = true } - } else if (send.wantsAck) commander ! send.ack + } else if (pendingSend.wantsAck) pendingCommander ! pendingSend.ack } finally { udpFF.bufferPool.release(buffer)