=act #3525 Retry finishConnect after OP_CONNECT

Patch picked from 63648f1780
This commit is contained in:
Johannes Rudolph 2013-08-23 15:33:52 +02:00 committed by Patrik Nordwall
parent cdea2af973
commit 0a88fbefae
3 changed files with 26 additions and 7 deletions

View file

@ -554,6 +554,11 @@ akka {
# Decreasing the value may improve fairness while increasing may improve
# throughput.
file-io-transferTo-limit = 512 KiB
# The number of times to retry the `finishConnect` call after being notified about
# OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that
# `finishConnect` will succeed, which is the case on Android.
finish-connect-retries = 5
}
udp {

View file

@ -460,6 +460,8 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
}
val MaxChannelsPerSelector: Int = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1)
val FinishConnectRetries: Int = getInt("finish-connect-retries") requiring (_ > 0,
"finish-connect-retries must be > 0")
private[this] def getIntBytes(path: String): Int = {
val size = getBytes(path)

View file

@ -8,7 +8,7 @@ import java.io.IOException
import java.nio.channels.{ SelectionKey, SocketChannel }
import java.net.ConnectException
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import akka.actor.{ ReceiveTimeout, ActorRef }
import akka.io.Inet.SocketOption
import akka.io.TcpConnection.CloseInformation
@ -55,19 +55,31 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
if (channel.connect(remoteAddress))
completeConnect(registration, commander, options)
else
context.become(connecting(registration, commander, options))
context.become(connecting(registration, commander, options, tcp.Settings.FinishConnectRetries))
}
}
def connecting(registration: ChannelRegistration, commander: ActorRef,
options: immutable.Traversable[SocketOption]): Receive = {
options: immutable.Traversable[SocketOption], remainingFinishConnectRetries: Int): Receive = {
{
case ChannelConnectable
if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout
reportConnectFailure {
channel.finishConnect() || (throw new ConnectException(s"Connection to [$remoteAddress] failed"))
log.debug("Connection established to [{}]", remoteAddress)
completeConnect(registration, commander, options)
if (channel.finishConnect()) {
if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout
log.debug("Connection established to [{}]", remoteAddress)
completeConnect(registration, commander, options)
} else {
if (remainingFinishConnectRetries > 0) {
context.system.scheduler.scheduleOnce(1.millisecond) {
channelRegistry.register(channel, SelectionKey.OP_CONNECT)
}(context.dispatcher)
context.become(connecting(registration, commander, options, remainingFinishConnectRetries - 1))
} else {
log.debug("Could not establish connection because finishConnect " +
"never returned true (consider increasing akka.io.tcp.finish-connect-retries)")
stop()
}
}
}
case ReceiveTimeout