diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 31d8d88f89..e79a312460 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -486,7 +486,14 @@ akka { # Fully qualified config path which holds the dispatcher configuration # on which file IO tasks are scheduled - file-io-dispatcher = "akka.io.pinned-dispatcher" + file-io-dispatcher = "akka.actor.default-dispatcher" + + # The maximum number of bytes (or "unlimited") to transfer in one batch when using + # `WriteFile` command which uses `FileChannel.transferTo` to pipe files to a TCP socket. + # On some OS like Linux `FileChannel.transferTo` may block for a long time when network + # IO is faster than file IO. Decreasing the value may improve fairness while increasing + # may improve throughput. + file-io-transferTo-limit = 512 KiB } udp { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index ab11280f04..f90aada133 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -189,6 +189,10 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { } val ManagementDispatcher = getString("management-dispatcher") val FileIODispatcher = getString("file-io-dispatcher") + val TransferToLimit = getString("file-io-transferTo-limit") match { + case "unlimited" ⇒ Int.MaxValue + case _ ⇒ getIntBytes("file-io-transferTo-limit") + } require(NrOfSelectors > 0, "nr-of-selectors must be > 0") require(MaxChannels == -1 || MaxChannels > 0, "max-channels must be > 0 or 'unlimited'") diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index a93c3e35b3..ad694295b7 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -344,7 +344,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, new Runnable { def run() { import pendingWrite._ - val writtenBytes = fileChannel.transferTo(currentPosition, remainingBytes, channel) + val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit) + val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel) if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes)) else { // finished