Generate file to use instead of assuming class loader type #23902
Makes the test pass on JDK 9 where the class loader assumption is not correct anymore.
This commit is contained in:
parent
ccf5d46a58
commit
d3b625616a
5 changed files with 73 additions and 52 deletions
|
|
@ -27,6 +27,9 @@ import akka.util.{ ByteString, Helpers }
|
|||
import akka.testkit.SocketUtil._
|
||||
import java.util.Random
|
||||
import java.net.SocketTimeoutException
|
||||
import java.nio.file.Files
|
||||
|
||||
import com.google.common.jimfs.{ Configuration, Jimfs }
|
||||
|
||||
object TcpConnectionSpec {
|
||||
case class Ack(i: Int) extends Event
|
||||
|
|
@ -129,7 +132,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
val wrote = serverSideChannel.write(buffer)
|
||||
wrote should ===(DataSize)
|
||||
|
||||
expectNoMsg(1000.millis) // data should have been transferred fully by now
|
||||
expectNoMessage(1000.millis) // data should have been transferred fully by now
|
||||
|
||||
selector.send(connectionActor, ChannelReadable)
|
||||
|
||||
|
|
@ -175,7 +178,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
buffer.clear()
|
||||
serverSideChannel.read(buffer) should ===(0)
|
||||
writer.send(connectionActor, unackedWrite)
|
||||
writer.expectNoMsg(500.millis)
|
||||
writer.expectNoMessage(500.millis)
|
||||
pullFromServerSide(remaining = 10, into = buffer)
|
||||
buffer.flip()
|
||||
ByteString(buffer).utf8String should ===("morestuff!")
|
||||
|
|
@ -208,7 +211,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
run {
|
||||
val writer = TestProbe()
|
||||
writer.send(connectionActor, Write(ByteString(42.toByte)))
|
||||
writer.expectNoMsg(500.millis)
|
||||
writer.expectNoMessage(500.millis)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -224,31 +227,33 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
run {
|
||||
val writer = TestProbe()
|
||||
writer.send(connectionActor, Write(ByteString.empty, NoAck))
|
||||
writer.expectNoMsg(250.millis)
|
||||
writer.expectNoMessage(250.millis)
|
||||
writer.send(connectionActor, Write(ByteString.empty, NoAck(42)))
|
||||
writer.expectNoMsg(250.millis)
|
||||
writer.expectNoMessage(250.millis)
|
||||
}
|
||||
}
|
||||
|
||||
"write file to network" in new EstablishedConnectionTest() {
|
||||
run {
|
||||
// hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath
|
||||
val testFile =
|
||||
classOf[TcpConnectionSpec].getClassLoader.asInstanceOf[URLClassLoader]
|
||||
.getURLs
|
||||
.filter(_.getProtocol == "file")
|
||||
.map(url ⇒ new File(url.toURI))
|
||||
.filter(_.exists)
|
||||
.sortBy(-_.length)
|
||||
.head
|
||||
|
||||
// maximum of 100 MB
|
||||
val size = math.min(testFile.length(), 100000000).toInt
|
||||
|
||||
val fs = Jimfs.newFileSystem("write-file-in-network", Configuration.unix())
|
||||
val tmpFile = Files.createTempFile(fs.getPath("/"), "whatever", ".dat")
|
||||
val writer = Files.newBufferedWriter(tmpFile)
|
||||
val oneKByteOfF = Array.fill[Char](1000)('F')
|
||||
// 10 mb of f:s in a file
|
||||
for (_ ← 0 to 10000) {
|
||||
writer.write(oneKByteOfF)
|
||||
}
|
||||
writer.flush()
|
||||
writer.close()
|
||||
try {
|
||||
val writer = TestProbe()
|
||||
writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack))
|
||||
val size = Files.size(tmpFile).toInt
|
||||
writer.send(connectionActor, WritePath(tmpFile, 0, size, Ack))
|
||||
pullFromServerSide(size, 1000000)
|
||||
writer.expectMsg(Ack)
|
||||
} finally {
|
||||
fs.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -349,8 +354,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
selector.send(connectionActor, ChannelReadable)
|
||||
|
||||
// this ChannelReadable should be properly ignored, even if data is already pending
|
||||
interestCallReceiver.expectNoMsg(100.millis)
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
interestCallReceiver.expectNoMessage(100.millis)
|
||||
connectionHandler.expectNoMessage(100.millis)
|
||||
|
||||
connectionHandler.send(connectionActor, ResumeReading)
|
||||
interestCallReceiver.expectMsg(OP_READ)
|
||||
|
|
@ -375,15 +380,15 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
// send a batch that is bigger than the default buffer to make sure we don't recurse and
|
||||
// send more than one Received messages
|
||||
serverSideChannel.write(ByteBuffer.wrap((ts ++ us).getBytes("ASCII")))
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
connectionHandler.expectNoMessage(100.millis)
|
||||
|
||||
connectionActor ! ResumeReading
|
||||
interestCallReceiver.expectMsg(OP_READ)
|
||||
selector.send(connectionActor, ChannelReadable)
|
||||
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(ts)
|
||||
|
||||
interestCallReceiver.expectNoMsg(100.millis)
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
interestCallReceiver.expectNoMessage(100.millis)
|
||||
connectionHandler.expectNoMessage(100.millis)
|
||||
|
||||
connectionActor ! ResumeReading
|
||||
interestCallReceiver.expectMsg(OP_READ)
|
||||
|
|
@ -391,8 +396,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(us)
|
||||
|
||||
// make sure that after reading all pending data we don't yet register for reading more data
|
||||
interestCallReceiver.expectNoMsg(100.millis)
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
interestCallReceiver.expectNoMessage(100.millis)
|
||||
connectionHandler.expectNoMessage(100.millis)
|
||||
|
||||
val vs = "v" * (maxBufferSize / 2)
|
||||
serverSideChannel.write(ByteBuffer.wrap(vs.getBytes("ASCII")))
|
||||
|
|
@ -438,7 +443,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
run {
|
||||
connectionHandler.send(connectionActor, Close)
|
||||
connectionHandler.expectMsg(Closed)
|
||||
connectionHandler.expectNoMsg(500.millis)
|
||||
connectionHandler.expectNoMessage(500.millis)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -517,12 +522,12 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
connectionHandler.send(connectionActor, writeCmd(Ack))
|
||||
connectionHandler.send(connectionActor, ConfirmedClose)
|
||||
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
connectionHandler.expectNoMessage(100.millis)
|
||||
pullFromServerSide(TestSize)
|
||||
connectionHandler.expectMsg(Ack)
|
||||
|
||||
selector.send(connectionActor, ChannelReadable)
|
||||
connectionHandler.expectNoMsg(100.millis) // not yet
|
||||
connectionHandler.expectNoMessage(100.millis) // not yet
|
||||
|
||||
val buffer = ByteBuffer.allocate(1)
|
||||
serverSelectionKey should be(selectedAs(SelectionKey.OP_READ, 2.seconds))
|
||||
|
|
@ -592,7 +597,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
err.cause should ===(ConnectionResetByPeerMessage)
|
||||
|
||||
// wait a while
|
||||
connectionHandler.expectNoMsg(200.millis)
|
||||
connectionHandler.expectNoMessage(200.millis)
|
||||
|
||||
assertThisConnectionActorTerminated()
|
||||
}
|
||||
|
|
@ -717,7 +722,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
|
||||
// resuming must not immediately work (queue still full)
|
||||
writer.send(connectionActor, ResumeWriting)
|
||||
writer.expectNoMsg(1.second)
|
||||
writer.expectNoMessage(1.second)
|
||||
|
||||
// so drain the queue until it works again
|
||||
while (!writer.msgAvailable) pullFromServerSide(TestSize)
|
||||
|
|
|
|||
|
|
@ -3,3 +3,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdo
|
|||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.this")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.actor.CoordinatedShutdown$Phase$")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.CoordinatedShutdown#Phase.apply")
|
||||
|
||||
# Path based WriteFile command #23902
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.TcpConnection.PendingWriteFile")
|
||||
|
|
@ -17,6 +17,7 @@ import akka.util.{ ByteString, Helpers }
|
|||
import akka.util.Helpers.Requiring
|
||||
import akka.actor._
|
||||
import java.lang.{ Iterable ⇒ JIterable }
|
||||
import java.nio.file.Path
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
|
|
@ -339,6 +340,15 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
|
|||
if (data.isEmpty) empty else Write(data, NoAck)
|
||||
}
|
||||
|
||||
/**
|
||||
* @see [[WritePath]]
|
||||
*/
|
||||
@deprecated("Use WritePath instead", "2.5.10")
|
||||
final case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand {
|
||||
require(position >= 0, "WriteFile.position must be >= 0")
|
||||
require(count > 0, "WriteFile.count must be > 0")
|
||||
}
|
||||
|
||||
/**
|
||||
* Write `count` bytes starting at `position` from file at `filePath` to the connection.
|
||||
* The count must be > 0. The connection actor will reply with a [[CommandFailed]]
|
||||
|
|
@ -349,7 +359,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
|
|||
* or have been sent!</b> Unfortunately there is no way to determine whether
|
||||
* a particular write has been sent by the O/S.
|
||||
*/
|
||||
final case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand {
|
||||
final case class WritePath(path: Path, position: Long, count: Long, ack: Event) extends SimpleWriteCommand {
|
||||
require(position >= 0, "WriteFile.position must be >= 0")
|
||||
require(count > 0, "WriteFile.count must be > 0")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,23 +4,24 @@
|
|||
|
||||
package akka.io
|
||||
|
||||
import java.io.IOException
|
||||
import java.net.{ InetSocketAddress, SocketException }
|
||||
import java.nio.channels.SelectionKey._
|
||||
import java.io.{ FileInputStream, IOException }
|
||||
import java.nio.channels.{ FileChannel, SocketChannel }
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.SelectionKey._
|
||||
import java.nio.channels.{ FileChannel, SocketChannel }
|
||||
import java.nio.file.{ Path, Paths }
|
||||
|
||||
import akka.actor._
|
||||
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
||||
import akka.io.Inet.SocketOption
|
||||
import akka.io.SelectionHandler._
|
||||
import akka.io.Tcp._
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor._
|
||||
import akka.util.ByteString
|
||||
import akka.io.Inet.SocketOption
|
||||
import akka.io.Tcp._
|
||||
import akka.io.SelectionHandler._
|
||||
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
||||
import java.nio.file.Paths
|
||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||
|
||||
/**
|
||||
* Base class for TcpIncomingConnection and TcpOutgoingConnection.
|
||||
|
|
@ -30,9 +31,9 @@ import java.nio.file.Paths
|
|||
private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel, val pullMode: Boolean)
|
||||
extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
|
||||
import TcpConnection._
|
||||
import tcp.Settings._
|
||||
import tcp.bufferPool
|
||||
import TcpConnection._
|
||||
|
||||
private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
|
||||
private[this] var peerClosed = false
|
||||
|
|
@ -378,7 +379,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
head match {
|
||||
case Write.empty ⇒ if (tail eq Write.empty) EmptyPendingWrite else create(tail)
|
||||
case Write(data, ack) if data.nonEmpty ⇒ PendingBufferWrite(commander, data, ack, tail)
|
||||
case WriteFile(path, offset, count, ack) ⇒ PendingWriteFile(commander, path, offset, count, ack, tail)
|
||||
case WriteFile(path, offset, count, ack) ⇒ PendingWriteFile(commander, Paths.get(path), offset, count, ack, tail)
|
||||
case WritePath(path, offset, count, ack) ⇒ PendingWriteFile(commander, path, offset, count, ack, tail)
|
||||
case CompoundWrite(h, t) ⇒ create(h, t)
|
||||
case x @ Write(_, ack) ⇒ // empty write with either an ACK or a non-standard NoACK
|
||||
if (x.wantsAck) commander ! ack
|
||||
|
|
@ -438,9 +440,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
def release(): Unit = bufferPool.release(buffer)
|
||||
}
|
||||
|
||||
def PendingWriteFile(commander: ActorRef, filePath: String, offset: Long, count: Long, ack: Event,
|
||||
def PendingWriteFile(commander: ActorRef, filePath: Path, offset: Long, count: Long, ack: Event,
|
||||
tail: WriteCommand): PendingWriteFile =
|
||||
new PendingWriteFile(commander, FileChannel.open(Paths.get(filePath)), offset, count, ack, tail)
|
||||
new PendingWriteFile(commander, FileChannel.open(filePath), offset, count, ack, tail)
|
||||
|
||||
class PendingWriteFile(
|
||||
val commander: ActorRef,
|
||||
|
|
|
|||
|
|
@ -130,7 +130,8 @@ object Dependencies {
|
|||
|
||||
val testkit = l ++= Seq(Test.junit, Test.scalatest.value) ++ Test.metricsAll
|
||||
|
||||
val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck.value)
|
||||
val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath,
|
||||
Test.mockito, Test.scalacheck.value, Test.jimfs)
|
||||
|
||||
val remote = l ++= Seq(netty, aeronDriver, aeronClient, Test.junit, Test.scalatest.value, Test.jimfs)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue