diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala index 9697219ed0..3977a8b7dc 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala @@ -22,11 +22,13 @@ import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future } import scala.concurrent.duration._ import scala.reflect.classTag +import scala.util.{ Failure, Success, Try } import scala.util.control.NoStackTrace import scala.util.control.NonFatal import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter } import io.netty.channel.ChannelHandler.Sharable + import org.apache.pekko import pekko.actor._ import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } @@ -338,21 +340,36 @@ private[pekko] class PlayerHandler( import ClientFSM._ - val connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection](reconnect()) + val connectionRef: AtomicReference[RemoteConnection] = new AtomicReference[RemoteConnection]() var nextAttempt: Deadline = _ + tryConnectToController() + @nowarn("msg=deprecated") override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { log.error("channel {} exception {}", ctx.channel(), cause) cause match { case _: ConnectException if reconnects > 0 => reconnects -= 1 - scheduler.scheduleOnce(nextAttempt.timeLeft)(connectionRef.set(reconnect())) case e => fsm ! ConnectionFailure(e.getMessage) } } + private def tryConnectToController(): Unit = { + Try(reconnect()) match { + case Success(r) => connectionRef.set(r) + case Failure(ex) => + log.error("Error when try to connect to remote addr:[{}] will retry, time left:[{}], cause:[{}].", + server, nextAttempt.timeLeft, ex.getMessage) + scheduleReconnect() + } + } + + private def scheduleReconnect(): Unit = { + scheduler.scheduleOnce(nextAttempt.timeLeft)(tryConnectToController()) + } + private def reconnect(): RemoteConnection = { nextAttempt = Deadline.now + backoff RemoteConnection(Client, server, poolSize, this) diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala index 945deff836..99801878bf 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala @@ -31,6 +31,7 @@ import io.netty.handler.codec.{ MessageToMessageDecoder, MessageToMessageEncoder } + import org.apache.pekko import pekko.protobufv3.internal.Message import pekko.util.Helpers @@ -130,6 +131,8 @@ private[pekko] object RemoteConnection { .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true) .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true) .connect(sockaddr) + .sync() + new RemoteConnection { override def channelFuture: ChannelFuture = cf diff --git a/project/SbtMultiJvm.scala b/project/SbtMultiJvm.scala index b05cf84701..e83c038746 100644 --- a/project/SbtMultiJvm.scala +++ b/project/SbtMultiJvm.scala @@ -22,9 +22,6 @@ import sbtassembly.AssemblyPlugin.assemblySettings import sbtassembly.{ AssemblyKeys, MergeStrategy } import AssemblyKeys._ -import java.net.{ InetSocketAddress, Socket } -import java.util.concurrent.TimeUnit - object MultiJvmPlugin extends AutoPlugin { case class Options(jvm: Seq[String], extra: String => Seq[String], run: String => Seq[String]) @@ -375,40 +372,11 @@ object MultiJvmPlugin extends AutoPlugin { log.debug("Starting %s for %s".format(jvmName, testClass)) log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" "))) val testClass2Process = (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions, jvmLogger, connectInput)) - if (index == 0) { - log.debug("%s for %s 's started as `Controller`, waiting before can be connected for clients.".format(jvmName, - testClass)) - val controllerHost = hosts.head - val serverPort: Int = Integer.getInteger("multinode.server-port", 4711) - waitingBeforeConnectable(controllerHost, serverPort, TimeUnit.SECONDS.toMillis(20L)) - } testClass2Process } processExitCodes(name, processes, log) } - private def waitingBeforeConnectable(host: String, port: Int, timeoutInMillis: Long): Unit = { - val inetSocketAddress = new InetSocketAddress(host, port) - def telnet(addr: InetSocketAddress, timeout: Int): Boolean = { - val socket: Socket = new Socket() - try { - socket.connect(inetSocketAddress, timeout) - socket.isConnected - } catch { - case _: Exception => false - } finally { - socket.close() - } - } - - val startTime = System.currentTimeMillis() - var connectivity = false - while (!connectivity && (System.currentTimeMillis() - startTime < timeoutInMillis)) { - connectivity = telnet(inetSocketAddress, 1000) - TimeUnit.MILLISECONDS.sleep(100) - } - } - def processExitCodes(name: String, processes: Seq[(String, Process)], log: Logger): (String, sbt.TestResult) = { val exitCodes = processes.map { case (testClass, process) => (testClass, process.exitValue())