diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6351083101..de5959726b 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -554,6 +554,11 @@ akka { # Decreasing the value may improve fairness while increasing may improve # throughput. file-io-transferTo-limit = 512 KiB + + # The number of times to retry the `finishConnect` call after being notified about + # OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that + # `finishConnect` will succeed, which is the case on Android. + finish-connect-retries = 5 } udp { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index f97c4a2f43..c2b8a311d3 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -460,6 +460,8 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { } val MaxChannelsPerSelector: Int = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1) + val FinishConnectRetries: Int = getInt("finish-connect-retries") requiring (_ > 0, + "finish-connect-retries must be > 0") private[this] def getIntBytes(path: String): Int = { val size = getBytes(path) diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 9c2daa29e4..6ac141b3cc 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -8,7 +8,7 @@ import java.io.IOException import java.nio.channels.{ SelectionKey, SocketChannel } import java.net.ConnectException import scala.collection.immutable -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import akka.actor.{ ReceiveTimeout, ActorRef } import akka.io.Inet.SocketOption import akka.io.TcpConnection.CloseInformation @@ -55,19 +55,31 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, if (channel.connect(remoteAddress)) completeConnect(registration, commander, options) else - context.become(connecting(registration, commander, options)) + context.become(connecting(registration, commander, options, tcp.Settings.FinishConnectRetries)) } } def connecting(registration: ChannelRegistration, commander: ActorRef, - options: immutable.Traversable[SocketOption]): Receive = { + options: immutable.Traversable[SocketOption], remainingFinishConnectRetries: Int): Receive = { { case ChannelConnectable ⇒ - if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout reportConnectFailure { - channel.finishConnect() || (throw new ConnectException(s"Connection to [$remoteAddress] failed")) - log.debug("Connection established to [{}]", remoteAddress) - completeConnect(registration, commander, options) + if (channel.finishConnect()) { + if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout + log.debug("Connection established to [{}]", remoteAddress) + completeConnect(registration, commander, options) + } else { + if (remainingFinishConnectRetries > 0) { + context.system.scheduler.scheduleOnce(1.millisecond) { + channelRegistry.register(channel, SelectionKey.OP_CONNECT) + }(context.dispatcher) + context.become(connecting(registration, commander, options, remainingFinishConnectRetries - 1)) + } else { + log.debug("Could not establish connection because finishConnect " + + "never returned true (consider increasing akka.io.tcp.finish-connect-retries)") + stop() + } + } } case ReceiveTimeout ⇒