diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 83974579f9..cdce29a1ac 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -4,8 +4,8 @@ package akka.io -import java.io.IOException -import java.net.{ ConnectException, InetSocketAddress, SocketException } +import java.io.{ FileOutputStream, File, IOException } +import java.net.{ URLClassLoader, ConnectException, InetSocketAddress, SocketException } import java.nio.ByteBuffer import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel } import java.nio.channels.spi.SelectorProvider @@ -190,6 +190,29 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") writer.expectMsg(Ack) } + "write file to network" in withEstablishedConnection() { setup ⇒ + import setup._ + + // hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath + val testFile = + classOf[TcpConnectionSpec].getClassLoader.asInstanceOf[URLClassLoader] + .getURLs + .filter(_.getProtocol == "file") + .map(url ⇒ new File(url.toURI)) + .filter(_.exists) + .sortBy(-_.length) + .head + + // maximum of 100 MB + val size = math.min(testFile.length(), 100000000).toInt + + object Ack + val writer = TestProbe() + writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack)) + pullFromServerSide(size, 1000000) + writer.expectMsg(Ack) + } + /* * Disabled on Windows: http://support.microsoft.com/kb/214397 * @@ -235,8 +258,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") writer.expectMsg(CommandFailed(secondWrite)) // reject even empty writes - writer.send(connectionActor, Write.Empty) - writer.expectMsg(CommandFailed(Write.Empty)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) // there will be immediately more space in the send buffer because // some data will have been sent by now, so we assume we can write @@ -607,7 +630,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") */ @tailrec final def pullFromServerSide(remaining: Int, remainingTries: Int = 1000): Unit = if (remainingTries <= 0) - throw new AssertionError("Pulling took too many loops") + throw new AssertionError("Pulling took too many loops, remaining data: " + remaining) else if (remaining > 0) { if (selector.msgAvailable) { selector.expectMsg(WriteInterest) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index fe6844b6e0..e79a312460 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -483,6 +483,17 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the selector management actors management-dispatcher = "akka.actor.default-dispatcher" + + # Fully qualified config path which holds the dispatcher configuration + # on which file IO tasks are scheduled + file-io-dispatcher = "akka.actor.default-dispatcher" + + # The maximum number of bytes (or "unlimited") to transfer in one batch when using + # `WriteFile` command which uses `FileChannel.transferTo` to pipe files to a TCP socket. + # On some OS like Linux `FileChannel.transferTo` may block for a long time when network + # IO is faster than file IO. Decreasing the value may improve fairness while increasing + # may improve throughput. + file-io-transferTo-limit = 512 KiB } udp { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 52cc39358d..89ad3e1644 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -92,19 +92,42 @@ object Tcp extends ExtensionKey[TcpExt] { case class NoAck(token: Any) object NoAck extends NoAck(null) + sealed trait WriteCommand extends Command { + require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") + + def ack: Any + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] + } + /** * Write data to the TCP connection. If no ack is needed use the special * `NoAck` object. */ - case class Write(data: ByteString, ack: Any) extends Command { - require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - - def wantsAck: Boolean = !ack.isInstanceOf[NoAck] - } + case class Write(data: ByteString, ack: Any) extends WriteCommand object Write { - val Empty: Write = Write(ByteString.empty, NoAck) + /** + * The empty Write doesn't write anything and isn't acknowledged. + * It will, however, be denied and sent back with `CommandFailed` if the + * connection isn't currently ready to send any data (because another WriteCommand + * is still pending). + */ + val empty: Write = Write(ByteString.empty, NoAck) + + /** + * Create a new unacknowledged Write command with the given data. + */ def apply(data: ByteString): Write = - if (data.isEmpty) Empty else Write(data, NoAck) + if (data.isEmpty) empty else Write(data, NoAck) + } + + /** + * Write `count` bytes starting at `position` from file at `filePath` to the connection. + * When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. The + * count must be > 0. + */ + case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand { + require(position >= 0, "WriteFile.position must be >= 0") + require(count > 0, "WriteFile.count must be > 0") } case object StopReading extends Command @@ -164,6 +187,11 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { case x ⇒ getIntBytes("received-message-size-limit") } val ManagementDispatcher = getString("management-dispatcher") + val FileIODispatcher = getString("file-io-dispatcher") + val TransferToLimit = getString("file-io-transferTo-limit") match { + case "unlimited" ⇒ Int.MaxValue + case _ ⇒ getIntBytes("file-io-transferTo-limit") + } require(NrOfSelectors > 0, "nr-of-selectors must be > 0") require(MaxChannels == -1 || MaxChannels > 0, "max-channels must be > 0 or 'unlimited'") @@ -187,6 +215,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { } val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize) + val fileIoDispatcher = system.dispatchers.lookup(Settings.FileIODispatcher) } object TcpSO extends SoJavaFactories { diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 9e5629adbb..83bd842272 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -5,8 +5,8 @@ package akka.io import java.net.InetSocketAddress -import java.io.IOException -import java.nio.channels.SocketChannel +import java.io.{ FileInputStream, IOException } +import java.nio.channels.{ FileChannel, SocketChannel } import java.nio.ByteBuffer import scala.annotation.tailrec import scala.collection.immutable @@ -92,8 +92,11 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, doWrite(handler) if (!writePending) // writing is now finished handleClose(handler, closeCommander, closedEvent) + case SendBufferFull(remaining) ⇒ pendingWrite = remaining; selector ! WriteInterest + case WriteFileFinished ⇒ pendingWrite = null; handleClose(handler, closeCommander, closedEvent) + case WriteFileFailed(e) ⇒ handleError(handler, e) // rethrow exception from dispatcher task - case Abort ⇒ handleClose(handler, Some(sender), Aborted) + case Abort ⇒ handleClose(handler, Some(sender), Aborted) } /** connection is closed on our side and we're waiting from confirmation from the other side */ @@ -107,7 +110,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, def handleWriteMessages(handler: ActorRef): Receive = { case ChannelWritable ⇒ if (writePending) doWrite(handler) - case write: Write if writePending ⇒ + case write: WriteCommand if writePending ⇒ if (TraceLogging) log.debug("Dropping write because queue is full") sender ! write.failureMessage @@ -115,9 +118,13 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, if (write.wantsAck) sender ! write.ack - case write: Write ⇒ + case write: WriteCommand ⇒ pendingWrite = createWrite(write) doWrite(handler) + + case SendBufferFull(remaining) ⇒ pendingWrite = remaining; selector ! WriteInterest + case WriteFileFinished ⇒ pendingWrite = null + case WriteFileFailed(e) ⇒ handleError(handler, e) // rethrow exception from dispatcher task } // AUXILIARIES and IMPLEMENTATION @@ -173,32 +180,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } finally bufferPool.release(buffer) } - final def doWrite(handler: ActorRef): Unit = { - @tailrec def innerWrite(): Unit = { - val toWrite = pendingWrite.buffer.remaining() - require(toWrite != 0) - val writtenBytes = channel.write(pendingWrite.buffer) - if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes) - - pendingWrite = pendingWrite.consume(writtenBytes) - - if (pendingWrite.hasData) - if (writtenBytes == toWrite) innerWrite() // wrote complete buffer, try again now - else selector ! WriteInterest // try again later - else { // everything written - if (pendingWrite.wantsAck) - pendingWrite.commander ! pendingWrite.ack - - val buffer = pendingWrite.buffer - pendingWrite = null - - bufferPool.release(buffer) - } - } - - try innerWrite() - catch { case e: IOException ⇒ handleError(handler, e) } - } + def doWrite(handler: ActorRef): Unit = + pendingWrite = pendingWrite.doWrite(handler) def closeReason = if (channel.socket.isOutputShutdown) ConfirmedClosed @@ -238,7 +221,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, context.stop(self) } - def handleError(handler: ActorRef, exception: IOException): Unit = { + def handleError(handler: ActorRef, exception: IOException): Nothing = { closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception))) throw exception @@ -267,8 +250,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, if (channel.isOpen) abort() - if (writePending) - bufferPool.release(pendingWrite.buffer) + if (writePending) pendingWrite.release() if (closedMessage != null) { val interestedInClose = @@ -282,30 +264,113 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, override def postRestart(reason: Throwable): Unit = throw new IllegalStateException("Restarting not supported for connection actors.") - private[io] case class PendingWrite( + /** Create a pending write from a WriteCommand */ + private[io] def createWrite(write: WriteCommand): PendingWrite = write match { + case write: Write ⇒ + val buffer = bufferPool.acquire() + + try { + val copied = write.data.copyToBuffer(buffer) + buffer.flip() + + PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer) + } catch { + case NonFatal(e) ⇒ + bufferPool.release(buffer) + throw e + } + case write: WriteFile ⇒ + PendingWriteFile(sender, write, new FileInputStream(write.filePath).getChannel, 0L) + } + + private[io] case class PendingBufferWrite( commander: ActorRef, ack: Any, remainingData: ByteString, - buffer: ByteBuffer) { + buffer: ByteBuffer) extends PendingWrite { - def consume(writtenBytes: Int): PendingWrite = - if (buffer.remaining() == 0) { + def release(): Unit = bufferPool.release(buffer) + + def doWrite(handler: ActorRef): PendingWrite = { + @tailrec def innerWrite(pendingWrite: PendingBufferWrite): PendingWrite = { + val toWrite = pendingWrite.buffer.remaining() + require(toWrite != 0) + val writtenBytes = channel.write(pendingWrite.buffer) + if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes) + + val nextWrite = pendingWrite.consume(writtenBytes) + + if (pendingWrite.hasData) + if (writtenBytes == toWrite) innerWrite(nextWrite) // wrote complete buffer, try again now + else { + selector ! WriteInterest + nextWrite + } // try again later + else { // everything written + if (pendingWrite.wantsAck) + pendingWrite.commander ! pendingWrite.ack + + pendingWrite.release() + null + } + } + + try innerWrite(this) + catch { case e: IOException ⇒ handleError(handler, e) } + } + def hasData = buffer.hasRemaining || remainingData.nonEmpty + def consume(writtenBytes: Int): PendingBufferWrite = + if (buffer.hasRemaining) this + else { buffer.clear() val copied = remainingData.copyToBuffer(buffer) buffer.flip() copy(remainingData = remainingData.drop(copied)) - } else this - - def hasData = buffer.remaining() > 0 || remainingData.size > 0 - def wantsAck = !ack.isInstanceOf[NoAck] + } } - def createWrite(write: Write): PendingWrite = { - val buffer = bufferPool.acquire() - val copied = write.data.copyToBuffer(buffer) - buffer.flip() - PendingWrite(sender, write.ack, write.data.drop(copied), buffer) + private[io] case class PendingWriteFile( + commander: ActorRef, + write: WriteFile, + fileChannel: FileChannel, + alreadyWritten: Long) extends PendingWrite { + + def doWrite(handler: ActorRef): PendingWrite = { + tcp.fileIoDispatcher.execute(writeFileRunnable(this)) + this + } + + def ack: Any = write.ack + + /** Release any open resources */ + def release() { fileChannel.close() } + + def updatedWrite(nowWritten: Long): PendingWriteFile = { + require(nowWritten < write.count) + copy(alreadyWritten = nowWritten) + } + + def remainingBytes = write.count - alreadyWritten + def currentPosition = write.position + alreadyWritten } + private[io] def writeFileRunnable(pendingWrite: PendingWriteFile): Runnable = + new Runnable { + def run(): Unit = try { + import pendingWrite._ + val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit) + val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel) + + if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes)) + else { // finished + if (wantsAck) commander ! write.ack + self ! WriteFileFinished + + pendingWrite.release() + } + } catch { + case e: IOException ⇒ self ! WriteFileFailed(e) + } + } } /** @@ -324,4 +389,25 @@ private[io] object TcpConnection { case class CloseInformation( notificationsTo: Set[ActorRef], closedEvent: Event) + + // INTERNAL MESSAGES + + /** Informs actor that no writing was possible but there is still work remaining */ + case class SendBufferFull(remainingWrite: PendingWrite) + /** Informs actor that a pending file write has finished */ + case object WriteFileFinished + /** Informs actor that a pending WriteFile failed */ + case class WriteFileFailed(e: IOException) + + /** Abstraction over pending writes */ + trait PendingWrite { + def commander: ActorRef + def ack: Any + + def wantsAck = !ack.isInstanceOf[NoAck] + def doWrite(handler: ActorRef): PendingWrite + + /** Release any open resources */ + def release(): Unit + } }