=sbt Do retry connection in Player.
This commit is contained in:
parent
4f570ea24c
commit
262d485d56
3 changed files with 22 additions and 34 deletions
|
|
@ -22,11 +22,13 @@ import scala.collection.immutable
|
|||
import scala.concurrent.{ Await, ExecutionContext, Future }
|
||||
import scala.concurrent.duration._
|
||||
import scala.reflect.classTag
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.util.control.NoStackTrace
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter }
|
||||
import io.netty.channel.ChannelHandler.Sharable
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.actor._
|
||||
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
||||
|
|
@ -338,21 +340,36 @@ private[pekko] class PlayerHandler(
|
|||
|
||||
import ClientFSM._
|
||||
|
||||
val connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection](reconnect())
|
||||
val connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection]()
|
||||
|
||||
var nextAttempt: Deadline = _
|
||||
|
||||
tryConnectToController()
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
|
||||
log.error("channel {} exception {}", ctx.channel(), cause)
|
||||
cause match {
|
||||
case _: ConnectException if reconnects > 0 =>
|
||||
reconnects -= 1
|
||||
scheduler.scheduleOnce(nextAttempt.timeLeft)(connectionRef.set(reconnect()))
|
||||
case e => fsm ! ConnectionFailure(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
private def tryConnectToController(): Unit = {
|
||||
Try(reconnect()) match {
|
||||
case Success(r) => connectionRef.set(r)
|
||||
case Failure(ex) =>
|
||||
log.error("Error when try to connect to remote addr:[{}] will retry, time left:[{}], cause:[{}].",
|
||||
server, nextAttempt.timeLeft, ex.getMessage)
|
||||
scheduleReconnect()
|
||||
}
|
||||
}
|
||||
|
||||
private def scheduleReconnect(): Unit = {
|
||||
scheduler.scheduleOnce(nextAttempt.timeLeft)(tryConnectToController())
|
||||
}
|
||||
|
||||
private def reconnect(): RemoteConnection = {
|
||||
nextAttempt = Deadline.now + backoff
|
||||
RemoteConnection(Client, server, poolSize, this)
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import io.netty.handler.codec.{
|
|||
MessageToMessageDecoder,
|
||||
MessageToMessageEncoder
|
||||
}
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.protobufv3.internal.Message
|
||||
import pekko.util.Helpers
|
||||
|
|
@ -130,6 +131,8 @@ private[pekko] object RemoteConnection {
|
|||
.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
|
||||
.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
|
||||
.connect(sockaddr)
|
||||
.sync()
|
||||
|
||||
new RemoteConnection {
|
||||
override def channelFuture: ChannelFuture = cf
|
||||
|
||||
|
|
|
|||
|
|
@ -22,9 +22,6 @@ import sbtassembly.AssemblyPlugin.assemblySettings
|
|||
import sbtassembly.{ AssemblyKeys, MergeStrategy }
|
||||
import AssemblyKeys._
|
||||
|
||||
import java.net.{ InetSocketAddress, Socket }
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
object MultiJvmPlugin extends AutoPlugin {
|
||||
|
||||
case class Options(jvm: Seq[String], extra: String => Seq[String], run: String => Seq[String])
|
||||
|
|
@ -375,40 +372,11 @@ object MultiJvmPlugin extends AutoPlugin {
|
|||
log.debug("Starting %s for %s".format(jvmName, testClass))
|
||||
log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" ")))
|
||||
val testClass2Process = (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions, jvmLogger, connectInput))
|
||||
if (index == 0) {
|
||||
log.debug("%s for %s 's started as `Controller`, waiting before can be connected for clients.".format(jvmName,
|
||||
testClass))
|
||||
val controllerHost = hosts.head
|
||||
val serverPort: Int = Integer.getInteger("multinode.server-port", 4711)
|
||||
waitingBeforeConnectable(controllerHost, serverPort, TimeUnit.SECONDS.toMillis(20L))
|
||||
}
|
||||
testClass2Process
|
||||
}
|
||||
processExitCodes(name, processes, log)
|
||||
}
|
||||
|
||||
private def waitingBeforeConnectable(host: String, port: Int, timeoutInMillis: Long): Unit = {
|
||||
val inetSocketAddress = new InetSocketAddress(host, port)
|
||||
def telnet(addr: InetSocketAddress, timeout: Int): Boolean = {
|
||||
val socket: Socket = new Socket()
|
||||
try {
|
||||
socket.connect(inetSocketAddress, timeout)
|
||||
socket.isConnected
|
||||
} catch {
|
||||
case _: Exception => false
|
||||
} finally {
|
||||
socket.close()
|
||||
}
|
||||
}
|
||||
|
||||
val startTime = System.currentTimeMillis()
|
||||
var connectivity = false
|
||||
while (!connectivity && (System.currentTimeMillis() - startTime < timeoutInMillis)) {
|
||||
connectivity = telnet(inetSocketAddress, 1000)
|
||||
TimeUnit.MILLISECONDS.sleep(100)
|
||||
}
|
||||
}
|
||||
|
||||
def processExitCodes(name: String, processes: Seq[(String, Process)], log: Logger): (String, sbt.TestResult) = {
|
||||
val exitCodes = processes.map {
|
||||
case (testClass, process) => (testClass, process.exitValue())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue