diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index bef3a6a84c..f4a2afd629 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -27,6 +27,9 @@ import akka.util.{ ByteString, Helpers } import akka.testkit.SocketUtil._ import java.util.Random import java.net.SocketTimeoutException +import java.nio.file.Files + +import com.google.common.jimfs.{ Configuration, Jimfs } object TcpConnectionSpec { case class Ack(i: Int) extends Event @@ -129,7 +132,7 @@ class TcpConnectionSpec extends AkkaSpec(""" val wrote = serverSideChannel.write(buffer) wrote should ===(DataSize) - expectNoMsg(1000.millis) // data should have been transferred fully by now + expectNoMessage(1000.millis) // data should have been transferred fully by now selector.send(connectionActor, ChannelReadable) @@ -175,7 +178,7 @@ class TcpConnectionSpec extends AkkaSpec(""" buffer.clear() serverSideChannel.read(buffer) should ===(0) writer.send(connectionActor, unackedWrite) - writer.expectNoMsg(500.millis) + writer.expectNoMessage(500.millis) pullFromServerSide(remaining = 10, into = buffer) buffer.flip() ByteString(buffer).utf8String should ===("morestuff!") @@ -208,7 +211,7 @@ class TcpConnectionSpec extends AkkaSpec(""" run { val writer = TestProbe() writer.send(connectionActor, Write(ByteString(42.toByte))) - writer.expectNoMsg(500.millis) + writer.expectNoMessage(500.millis) } } @@ -224,31 +227,33 @@ class TcpConnectionSpec extends AkkaSpec(""" run { val writer = TestProbe() writer.send(connectionActor, Write(ByteString.empty, NoAck)) - writer.expectNoMsg(250.millis) + writer.expectNoMessage(250.millis) writer.send(connectionActor, Write(ByteString.empty, NoAck(42))) - writer.expectNoMsg(250.millis) + writer.expectNoMessage(250.millis) } } "write file to network" in new EstablishedConnectionTest() { run { - // hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath - val testFile = - classOf[TcpConnectionSpec].getClassLoader.asInstanceOf[URLClassLoader] - .getURLs - .filter(_.getProtocol == "file") - .map(url ⇒ new File(url.toURI)) - .filter(_.exists) - .sortBy(-_.length) - .head - - // maximum of 100 MB - val size = math.min(testFile.length(), 100000000).toInt - - val writer = TestProbe() - writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack)) - pullFromServerSide(size, 1000000) - writer.expectMsg(Ack) + val fs = Jimfs.newFileSystem("write-file-in-network", Configuration.unix()) + val tmpFile = Files.createTempFile(fs.getPath("/"), "whatever", ".dat") + val writer = Files.newBufferedWriter(tmpFile) + val oneKByteOfF = Array.fill[Char](1000)('F') + // 10 mb of f:s in a file + for (_ ← 0 to 10000) { + writer.write(oneKByteOfF) + } + writer.flush() + writer.close() + try { + val writer = TestProbe() + val size = Files.size(tmpFile).toInt + writer.send(connectionActor, WritePath(tmpFile, 0, size, Ack)) + pullFromServerSide(size, 1000000) + writer.expectMsg(Ack) + } finally { + fs.close() + } } } @@ -349,8 +354,8 @@ class TcpConnectionSpec extends AkkaSpec(""" selector.send(connectionActor, ChannelReadable) // this ChannelReadable should be properly ignored, even if data is already pending - interestCallReceiver.expectNoMsg(100.millis) - connectionHandler.expectNoMsg(100.millis) + interestCallReceiver.expectNoMessage(100.millis) + connectionHandler.expectNoMessage(100.millis) connectionHandler.send(connectionActor, ResumeReading) interestCallReceiver.expectMsg(OP_READ) @@ -375,15 +380,15 @@ class TcpConnectionSpec extends AkkaSpec(""" // send a batch that is bigger than the default buffer to make sure we don't recurse and // send more than one Received messages serverSideChannel.write(ByteBuffer.wrap((ts ++ us).getBytes("ASCII"))) - connectionHandler.expectNoMsg(100.millis) + connectionHandler.expectNoMessage(100.millis) connectionActor ! ResumeReading interestCallReceiver.expectMsg(OP_READ) selector.send(connectionActor, ChannelReadable) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(ts) - interestCallReceiver.expectNoMsg(100.millis) - connectionHandler.expectNoMsg(100.millis) + interestCallReceiver.expectNoMessage(100.millis) + connectionHandler.expectNoMessage(100.millis) connectionActor ! ResumeReading interestCallReceiver.expectMsg(OP_READ) @@ -391,8 +396,8 @@ class TcpConnectionSpec extends AkkaSpec(""" connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(us) // make sure that after reading all pending data we don't yet register for reading more data - interestCallReceiver.expectNoMsg(100.millis) - connectionHandler.expectNoMsg(100.millis) + interestCallReceiver.expectNoMessage(100.millis) + connectionHandler.expectNoMessage(100.millis) val vs = "v" * (maxBufferSize / 2) serverSideChannel.write(ByteBuffer.wrap(vs.getBytes("ASCII"))) @@ -438,7 +443,7 @@ class TcpConnectionSpec extends AkkaSpec(""" run { connectionHandler.send(connectionActor, Close) connectionHandler.expectMsg(Closed) - connectionHandler.expectNoMsg(500.millis) + connectionHandler.expectNoMessage(500.millis) } } @@ -517,12 +522,12 @@ class TcpConnectionSpec extends AkkaSpec(""" connectionHandler.send(connectionActor, writeCmd(Ack)) connectionHandler.send(connectionActor, ConfirmedClose) - connectionHandler.expectNoMsg(100.millis) + connectionHandler.expectNoMessage(100.millis) pullFromServerSide(TestSize) connectionHandler.expectMsg(Ack) selector.send(connectionActor, ChannelReadable) - connectionHandler.expectNoMsg(100.millis) // not yet + connectionHandler.expectNoMessage(100.millis) // not yet val buffer = ByteBuffer.allocate(1) serverSelectionKey should be(selectedAs(SelectionKey.OP_READ, 2.seconds)) @@ -592,7 +597,7 @@ class TcpConnectionSpec extends AkkaSpec(""" err.cause should ===(ConnectionResetByPeerMessage) // wait a while - connectionHandler.expectNoMsg(200.millis) + connectionHandler.expectNoMessage(200.millis) assertThisConnectionActorTerminated() } @@ -717,7 +722,7 @@ class TcpConnectionSpec extends AkkaSpec(""" // resuming must not immediately work (queue still full) writer.send(connectionActor, ResumeWriting) - writer.expectNoMsg(1.second) + writer.expectNoMessage(1.second) // so drain the queue until it works again while (!writer.msgAvailable) pullFromServerSide(TestSize) diff --git a/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes index 5a925561ef..4c284b396f 100644 --- a/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.10.backwards.excludes @@ -2,4 +2,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.copy") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this") ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply") + +# Path based WriteFile command #23902 +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.TcpConnection.PendingWriteFile") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 679ce65bf1..a1a2d7f74a 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -17,6 +17,7 @@ import akka.util.{ ByteString, Helpers } import akka.util.Helpers.Requiring import akka.actor._ import java.lang.{ Iterable ⇒ JIterable } +import java.nio.file.Path import akka.annotation.InternalApi @@ -339,6 +340,15 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { if (data.isEmpty) empty else Write(data, NoAck) } + /** + * @see [[WritePath]] + */ + @deprecated("Use WritePath instead", "2.5.10") + final case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand { + require(position >= 0, "WriteFile.position must be >= 0") + require(count > 0, "WriteFile.count must be > 0") + } + /** * Write `count` bytes starting at `position` from file at `filePath` to the connection. * The count must be > 0. The connection actor will reply with a [[CommandFailed]] @@ -349,7 +359,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * or have been sent! Unfortunately there is no way to determine whether * a particular write has been sent by the O/S. */ - final case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand { + final case class WritePath(path: Path, position: Long, count: Long, ack: Event) extends SimpleWriteCommand { require(position >= 0, "WriteFile.position must be >= 0") require(count > 0, "WriteFile.count must be > 0") } diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 40bf18c9ab..8187d08046 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -4,23 +4,24 @@ package akka.io +import java.io.IOException import java.net.{ InetSocketAddress, SocketException } -import java.nio.channels.SelectionKey._ -import java.io.{ FileInputStream, IOException } -import java.nio.channels.{ FileChannel, SocketChannel } import java.nio.ByteBuffer +import java.nio.channels.SelectionKey._ +import java.nio.channels.{ FileChannel, SocketChannel } +import java.nio.file.{ Path, Paths } + +import akka.actor._ +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } +import akka.io.Inet.SocketOption +import akka.io.SelectionHandler._ +import akka.io.Tcp._ +import akka.util.ByteString import scala.annotation.tailrec import scala.collection.immutable -import scala.util.control.{ NoStackTrace, NonFatal } import scala.concurrent.duration._ -import akka.actor._ -import akka.util.ByteString -import akka.io.Inet.SocketOption -import akka.io.Tcp._ -import akka.io.SelectionHandler._ -import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import java.nio.file.Paths +import scala.util.control.{ NoStackTrace, NonFatal } /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. @@ -30,9 +31,9 @@ import java.nio.file.Paths private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel, val pullMode: Boolean) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { + import TcpConnection._ import tcp.Settings._ import tcp.bufferPool - import TcpConnection._ private[this] var pendingWrite: PendingWrite = EmptyPendingWrite private[this] var peerClosed = false @@ -378,7 +379,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha head match { case Write.empty ⇒ if (tail eq Write.empty) EmptyPendingWrite else create(tail) case Write(data, ack) if data.nonEmpty ⇒ PendingBufferWrite(commander, data, ack, tail) - case WriteFile(path, offset, count, ack) ⇒ PendingWriteFile(commander, path, offset, count, ack, tail) + case WriteFile(path, offset, count, ack) ⇒ PendingWriteFile(commander, Paths.get(path), offset, count, ack, tail) + case WritePath(path, offset, count, ack) ⇒ PendingWriteFile(commander, path, offset, count, ack, tail) case CompoundWrite(h, t) ⇒ create(h, t) case x @ Write(_, ack) ⇒ // empty write with either an ACK or a non-standard NoACK if (x.wantsAck) commander ! ack @@ -438,9 +440,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha def release(): Unit = bufferPool.release(buffer) } - def PendingWriteFile(commander: ActorRef, filePath: String, offset: Long, count: Long, ack: Event, + def PendingWriteFile(commander: ActorRef, filePath: Path, offset: Long, count: Long, ack: Event, tail: WriteCommand): PendingWriteFile = - new PendingWriteFile(commander, FileChannel.open(Paths.get(filePath)), offset, count, ack, tail) + new PendingWriteFile(commander, FileChannel.open(filePath), offset, count, ack, tail) class PendingWriteFile( val commander: ActorRef, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e8173208af..5a0a649aa9 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -130,7 +130,8 @@ object Dependencies { val testkit = l ++= Seq(Test.junit, Test.scalatest.value) ++ Test.metricsAll - val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck.value) + val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath, + Test.mockito, Test.scalacheck.value, Test.jimfs) val remote = l ++= Seq(netty, aeronDriver, aeronClient, Test.junit, Test.scalatest.value, Test.jimfs)