Merge pull request #1548 from spray/wip/io-fix-configured-noacks
io: fix rare bug in TcpConnection causing the sending of certain `NoAck`...
This commit is contained in:
commit
b044b6b5a9
2 changed files with 23 additions and 9 deletions
|
|
@ -143,7 +143,6 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
|
|
||||||
"write data to network (and acknowledge)" in new EstablishedConnectionTest() {
|
"write data to network (and acknowledge)" in new EstablishedConnectionTest() {
|
||||||
run {
|
run {
|
||||||
object Ack extends Event
|
|
||||||
val writer = TestProbe()
|
val writer = TestProbe()
|
||||||
|
|
||||||
// directly acknowledge an empty write
|
// directly acknowledge an empty write
|
||||||
|
|
@ -175,16 +174,30 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
|
|
||||||
"write data after not acknowledged data" in new EstablishedConnectionTest() {
|
"write data after not acknowledged data" in new EstablishedConnectionTest() {
|
||||||
run {
|
run {
|
||||||
object Ack extends Event
|
|
||||||
val writer = TestProbe()
|
val writer = TestProbe()
|
||||||
writer.send(connectionActor, Write(ByteString(42.toByte)))
|
writer.send(connectionActor, Write(ByteString(42.toByte)))
|
||||||
writer.expectNoMsg(500.millis)
|
writer.expectNoMsg(500.millis)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"acknowledge the completion of an ACKed empty write" in new EstablishedConnectionTest() {
|
||||||
|
run {
|
||||||
|
val writer = TestProbe()
|
||||||
writer.send(connectionActor, Write(ByteString.empty, Ack))
|
writer.send(connectionActor, Write(ByteString.empty, Ack))
|
||||||
writer.expectMsg(Ack)
|
writer.expectMsg(Ack)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not acknowledge the completion of a NACKed empty write" in new EstablishedConnectionTest() {
|
||||||
|
run {
|
||||||
|
val writer = TestProbe()
|
||||||
|
writer.send(connectionActor, Write(ByteString.empty, NoAck))
|
||||||
|
writer.expectNoMsg(250.millis)
|
||||||
|
writer.send(connectionActor, Write(ByteString.empty, NoAck(42)))
|
||||||
|
writer.expectNoMsg(250.millis)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"write file to network" in new EstablishedConnectionTest() {
|
"write file to network" in new EstablishedConnectionTest() {
|
||||||
run {
|
run {
|
||||||
// hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath
|
// hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath
|
||||||
|
|
@ -200,7 +213,6 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
// maximum of 100 MB
|
// maximum of 100 MB
|
||||||
val size = math.min(testFile.length(), 100000000).toInt
|
val size = math.min(testFile.length(), 100000000).toInt
|
||||||
|
|
||||||
object Ack extends Event
|
|
||||||
val writer = TestProbe()
|
val writer = TestProbe()
|
||||||
writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack))
|
writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack))
|
||||||
pullFromServerSide(size, 1000000)
|
pullFromServerSide(size, 1000000)
|
||||||
|
|
@ -284,7 +296,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
new EstablishedConnectionTest() with SmallRcvBuffer {
|
new EstablishedConnectionTest() with SmallRcvBuffer {
|
||||||
run {
|
run {
|
||||||
// we should test here that a pending write command is properly finished first
|
// we should test here that a pending write command is properly finished first
|
||||||
object Ack extends Event
|
|
||||||
// set an artificially small send buffer size so that the write is queued
|
// set an artificially small send buffer size so that the write is queued
|
||||||
// inside the connection actor
|
// inside the connection actor
|
||||||
clientSideChannel.socket.setSendBufferSize(1024)
|
clientSideChannel.socket.setSendBufferSize(1024)
|
||||||
|
|
@ -346,7 +358,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
new EstablishedConnectionTest() with SmallRcvBuffer {
|
new EstablishedConnectionTest() with SmallRcvBuffer {
|
||||||
run {
|
run {
|
||||||
// we should test here that a pending write command is properly finished first
|
// we should test here that a pending write command is properly finished first
|
||||||
object Ack extends Event
|
|
||||||
// set an artificially small send buffer size so that the write is queued
|
// set an artificially small send buffer size so that the write is queued
|
||||||
// inside the connection actor
|
// inside the connection actor
|
||||||
clientSideChannel.socket.setSendBufferSize(1024)
|
clientSideChannel.socket.setSendBufferSize(1024)
|
||||||
|
|
@ -379,7 +391,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
ignoreIfWindows()
|
ignoreIfWindows()
|
||||||
|
|
||||||
// we should test here that a pending write command is properly finished first
|
// we should test here that a pending write command is properly finished first
|
||||||
object Ack extends Event
|
|
||||||
// set an artificially small send buffer size so that the write is queued
|
// set an artificially small send buffer size so that the write is queued
|
||||||
// inside the connection actor
|
// inside the connection actor
|
||||||
clientSideChannel.socket.setSendBufferSize(1024)
|
clientSideChannel.socket.setSendBufferSize(1024)
|
||||||
|
|
@ -426,7 +438,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
|
|
||||||
selector.send(connectionActor, ChannelReadable)
|
selector.send(connectionActor, ChannelReadable)
|
||||||
connectionHandler.expectMsg(PeerClosed)
|
connectionHandler.expectMsg(PeerClosed)
|
||||||
object Ack extends Event
|
|
||||||
connectionHandler.send(connectionActor, writeCmd(Ack))
|
connectionHandler.send(connectionActor, writeCmd(Ack))
|
||||||
pullFromServerSide(TestSize)
|
pullFromServerSide(TestSize)
|
||||||
connectionHandler.expectMsg(Ack)
|
connectionHandler.expectMsg(Ack)
|
||||||
|
|
@ -444,7 +456,7 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
|
|
||||||
selector.send(connectionActor, ChannelReadable)
|
selector.send(connectionActor, ChannelReadable)
|
||||||
connectionHandler.expectMsg(PeerClosed)
|
connectionHandler.expectMsg(PeerClosed)
|
||||||
object Ack extends Event
|
|
||||||
connectionHandler.send(connectionActor, writeCmd(Ack))
|
connectionHandler.send(connectionActor, writeCmd(Ack))
|
||||||
pullFromServerSide(TestSize)
|
pullFromServerSide(TestSize)
|
||||||
connectionHandler.expectMsg(Ack)
|
connectionHandler.expectMsg(Ack)
|
||||||
|
|
@ -890,4 +902,6 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
channel.close()
|
channel.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object Ack extends Event
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -132,7 +132,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
|
|
||||||
} else write match {
|
} else write match {
|
||||||
case Write(data, ack) if data.isEmpty ⇒
|
case Write(data, ack) if data.isEmpty ⇒
|
||||||
if (ack != NoAck) sender ! ack
|
if (write.wantsAck) sender ! ack
|
||||||
|
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
pendingWrite = createWrite(write)
|
pendingWrite = createWrite(write)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue