Merge branch '1182'

Conflicts:
	akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala
This commit is contained in:
Endre Sándor Varga 2013-02-25 16:37:04 +01:00
commit 6321663471
3 changed files with 39 additions and 27 deletions

View file

@ -298,7 +298,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
* has hit the network medium. The only exception is when you disable the Winsock buffering by setting
* SO_SNDBUF to 0."
*/
"close the connection and reply with `ConfirmedClosed` upong reception of an `ConfirmedClose` command (simplified)" in withEstablishedConnection(setSmallRcvBuffer) { setup
"close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command (simplified)" in withEstablishedConnection(setSmallRcvBuffer) { setup
import setup._
// we should test here that a pending write command is properly finished first
@ -327,7 +327,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
assertThisConnectionActorTerminated()
}
"close the connection and reply with `ConfirmedClosed` upong reception of an `ConfirmedClose` command" in withEstablishedConnection(setSmallRcvBuffer) { setup
"close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command" in withEstablishedConnection(setSmallRcvBuffer) { setup
ignoreIfWindows()
import setup._

View file

@ -5,13 +5,13 @@
package akka.io
import java.net.Socket
import java.nio.channels.SocketChannel
import scala.concurrent.duration._
import akka.actor.{ Terminated, SupervisorStrategy, Actor, Props }
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec }
import Tcp._
import akka.testkit.EventFilter
import akka.io.SelectionHandler._
import java.nio.channels.SelectionKey._
import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming }
class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
@ -32,27 +32,35 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
attemptConnectionToEndpoint()
attemptConnectionToEndpoint()
def expectWorkerForCommand: Unit = {
selectorRouter.expectMsgPF() {
case WorkerForCommand(RegisterIncoming(chan), commander, _)
chan.isOpen must be(true)
commander must be === listener
}
}
// since the batch-accept-limit is 2 we must only receive 2 accepted connections
listener ! ChannelAcceptable
parent.expectMsg(AcceptInterest)
expectWorkerForCommand
expectWorkerForCommand
selectorRouter.expectNoMsg(100.millis)
parent.expectMsg(AcceptInterest)
// and pick up the last remaining connection on the next ChannelAcceptable
listener ! ChannelAcceptable
expectWorkerForCommand
}
"continue to accept connections after a previous accept" in new TestSetup {
bindListener()
attemptConnectionToEndpoint()
listener ! ChannelAcceptable
expectWorkerForCommand
selectorRouter.expectNoMsg(100.millis)
parent.expectMsg(AcceptInterest)
attemptConnectionToEndpoint()
listener ! ChannelAcceptable
expectWorkerForCommand
selectorRouter.expectNoMsg(100.millis)
parent.expectMsg(AcceptInterest)
}
"react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup {
bindListener()
@ -69,12 +77,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
attemptConnectionToEndpoint()
listener ! ChannelAcceptable
val channel = selectorRouter.expectMsgPF() {
case WorkerForCommand(RegisterIncoming(chan), commander, _)
chan.isOpen must be(true)
commander must be === listener
chan
}
val channel = expectWorkerForCommand
EventFilter.warning(pattern = "selector capacity limit", occurrences = 1) intercept {
listener ! FailedRegisterIncoming(channel)
@ -105,6 +108,14 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
def listener = parentRef.underlyingActor.listener
def expectWorkerForCommand: SocketChannel =
selectorRouter.expectMsgPF() {
case WorkerForCommand(RegisterIncoming(chan), commander, _)
chan.isOpen must be(true)
commander must be === listener
chan
}
private class ListenerParent extends Actor {
val listener = context.actorOf(
props = Props(new TcpListener(selectorRouter.ref, Tcp(system), bindCommander.ref, Bind(handler.ref, endpoint, 100, Nil))),

View file

@ -84,20 +84,21 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
context.stop(self)
}
@tailrec final def acceptAllPending(limit: Int): Unit =
if (limit > 0) {
@tailrec final def acceptAllPending(limit: Int): Unit = {
val socketChannel =
if (limit > 0) {
try channel.accept()
catch {
case NonFatal(e) log.error(e, "Accept error: could not accept new connection due to {}", e); null
}
} else null
if (socketChannel != null) {
log.debug("New connection accepted")
socketChannel.configureBlocking(false)
selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, Props(new TcpIncomingConnection(socketChannel, tcp, handler, options)))
acceptAllPending(limit - 1)
}
} else context.parent ! AcceptInterest
}
override def postStop() {
try {