Fix multi-node-testkit warnings (#26754)
This commit is contained in:
parent
e390f1397b
commit
306187046f
8 changed files with 25 additions and 25 deletions
|
|
@ -12,7 +12,6 @@ import akka.actor.{
|
|||
Address,
|
||||
DeadLetterSuppression,
|
||||
Deploy,
|
||||
FSM,
|
||||
LoggingFSM,
|
||||
NoSerializationVerificationNeeded,
|
||||
OneForOneStrategy,
|
||||
|
|
@ -441,10 +440,10 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
|
|||
override def supervisorStrategy = OneForOneStrategy() {
|
||||
case BarrierTimeout(data) => failBarrier(data)
|
||||
case FailedBarrier(data) => failBarrier(data)
|
||||
case BarrierEmpty(data, msg) => SupervisorStrategy.Resume
|
||||
case BarrierEmpty(_, _) => SupervisorStrategy.Resume
|
||||
case WrongBarrier(name, client, data) => { client ! ToClient(BarrierResult(name, false)); failBarrier(data) }
|
||||
case ClientLost(data, node) => failBarrier(data)
|
||||
case DuplicateNode(data, node) => failBarrier(data)
|
||||
case ClientLost(data, _) => failBarrier(data)
|
||||
case DuplicateNode(data, _) => failBarrier(data)
|
||||
}
|
||||
|
||||
def failBarrier(data: Data): SupervisorStrategy.Directive = {
|
||||
|
|
@ -580,7 +579,6 @@ private[akka] class BarrierCoordinator
|
|||
with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] {
|
||||
import BarrierCoordinator._
|
||||
import Controller._
|
||||
import FSM._
|
||||
|
||||
// this shall be set to true if all subsequent barriers shall fail
|
||||
var failed = false
|
||||
|
|
@ -639,7 +637,7 @@ private[akka] class BarrierCoordinator
|
|||
handleBarrier(d.copy(arrived = together, deadline = enterDeadline))
|
||||
} else
|
||||
handleBarrier(d.copy(arrived = together))
|
||||
case Event(RemoveClient(name), d @ Data(clients, barrier, arrived, _)) =>
|
||||
case Event(RemoveClient(name), d @ Data(clients, _, arrived, _)) =>
|
||||
clients.find(_.name == name) match {
|
||||
case None => stay
|
||||
case Some(client) =>
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import akka.remote.transport.ThrottlerTransportAdapter.{ Blackhole, SetThrottle,
|
|||
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
||||
import akka.util.ccompat._
|
||||
|
||||
@ccompatUsedUntil213
|
||||
object Player {
|
||||
|
||||
final class Waiter extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
|
|
@ -115,7 +116,7 @@ trait Player { this: TestConductorExt =>
|
|||
implicit val timeout = Timeout(barrierTimeout + Settings.QueryTimeout.duration)
|
||||
Await.result(client ? ToServer(EnterBarrier(b, Option(barrierTimeout))), Duration.Inf)
|
||||
} catch {
|
||||
case e: AskTimeoutException =>
|
||||
case _: AskTimeoutException =>
|
||||
client ! ToServer(FailBarrier(b))
|
||||
// Why don't TimeoutException have a constructor that takes a cause?
|
||||
throw new TimeoutException("Client timed out while waiting for barrier " + b);
|
||||
|
|
@ -184,7 +185,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
startWith(Connecting, Data(None, None))
|
||||
|
||||
when(Connecting, stateTimeout = settings.ConnectTimeout) {
|
||||
case Event(msg: ClientOp, _) =>
|
||||
case Event(_: ClientOp, _) =>
|
||||
stay.replying(Status.Failure(new IllegalStateException("not connected yet")))
|
||||
case Event(Connected(channel), _) =>
|
||||
channel.write(Hello(name.name, TestConductor().address))
|
||||
|
|
@ -204,7 +205,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
case Event(msg: NetworkOp, _) =>
|
||||
log.error("received {} instead of Done", msg)
|
||||
goto(Failed)
|
||||
case Event(msg: ServerOp, _) =>
|
||||
case Event(_: ServerOp, _) =>
|
||||
stay.replying(Status.Failure(new IllegalStateException("not connected yet")))
|
||||
case Event(StateTimeout, _) =>
|
||||
log.error("connect timeout to TestConductor")
|
||||
|
|
@ -221,15 +222,15 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
case Event(ToServer(msg), d @ Data(Some(channel), None)) =>
|
||||
channel.write(msg)
|
||||
val token = msg match {
|
||||
case EnterBarrier(barrier, timeout) => Some(barrier -> sender())
|
||||
case GetAddress(node) => Some(node.name -> sender())
|
||||
case _ => None
|
||||
case EnterBarrier(barrier, _) => Some(barrier -> sender())
|
||||
case GetAddress(node) => Some(node.name -> sender())
|
||||
case _ => None
|
||||
}
|
||||
stay.using(d.copy(runningOp = token))
|
||||
case Event(ToServer(op), Data(channel, Some((token, _)))) =>
|
||||
case Event(ToServer(op), Data(_, Some((token, _)))) =>
|
||||
log.error("cannot write {} while waiting for {}", op, token)
|
||||
stay
|
||||
case Event(op: ClientOp, d @ Data(Some(channel), runningOp)) =>
|
||||
case Event(op: ClientOp, d @ Data(Some(channel @ _), runningOp)) =>
|
||||
op match {
|
||||
case BarrierResult(b, success) =>
|
||||
runningOp match {
|
||||
|
|
@ -244,7 +245,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
log.warning("did not expect {}", op)
|
||||
}
|
||||
stay.using(d.copy(runningOp = None))
|
||||
case AddressReply(node, address) =>
|
||||
case AddressReply(_, address) =>
|
||||
runningOp match {
|
||||
case Some((_, requester)) => requester ! address
|
||||
case None => log.warning("did not expect {}", op)
|
||||
|
|
@ -273,7 +274,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
"adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig")
|
||||
}
|
||||
stay
|
||||
case d: DisconnectMsg =>
|
||||
case _: DisconnectMsg =>
|
||||
// FIXME: Currently ignoring, needs support from Remoting
|
||||
stay
|
||||
case TerminateMsg(Left(false)) =>
|
||||
|
|
@ -340,7 +341,7 @@ private[akka] class PlayerHandler(
|
|||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
log.debug("channel {} exception {}", event.getChannel, event.getCause)
|
||||
event.getCause match {
|
||||
case c: ConnectException if reconnects > 0 =>
|
||||
case _: ConnectException if reconnects > 0 =>
|
||||
reconnects -= 1
|
||||
scheduler.scheduleOnce(nextAttempt.timeLeft)(reconnect())
|
||||
case e => fsm ! ConnectionFailure(e.getMessage)
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import akka.util.ccompat._
|
|||
/**
|
||||
* Configure the role names and participants of the test, including configuration settings.
|
||||
*/
|
||||
@ccompatUsedUntil213
|
||||
abstract class MultiNodeConfig {
|
||||
|
||||
private var _commonConf: Option[Config] = None
|
||||
|
|
@ -437,7 +438,8 @@ abstract class MultiNodeSpec(
|
|||
|
||||
// now add deployments, if so desired
|
||||
|
||||
private final case class Replacement(tag: String, role: RoleName) {
|
||||
// Cannot be final because of https://github.com/scala/bug/issues/4440
|
||||
private case class Replacement(tag: String, role: RoleName) {
|
||||
lazy val addr = node(role).address.toString
|
||||
}
|
||||
|
||||
|
|
@ -450,7 +452,7 @@ abstract class MultiNodeSpec(
|
|||
case (base, r @ Replacement(tag, _)) =>
|
||||
base.indexOf(tag) match {
|
||||
case -1 => base
|
||||
case start =>
|
||||
case _ =>
|
||||
val replaceWith = try r.addr
|
||||
catch {
|
||||
case NonFatal(e) =>
|
||||
|
|
|
|||
|
|
@ -22,12 +22,12 @@ private[akka] trait PerfFlamesSupport { _: MultiNodeSpec =>
|
|||
*
|
||||
* Options are currently to be passed in via `export PERF_MAP_OPTIONS` etc.
|
||||
*/
|
||||
def runPerfFlames(nodes: RoleName*)(delay: FiniteDuration, time: FiniteDuration = 15.seconds): Unit = {
|
||||
def runPerfFlames(nodes: RoleName*)(delay: FiniteDuration): Unit = {
|
||||
if (isPerfJavaFlamesAvailable && isNode(nodes: _*)) {
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
val afterDelay = akka.pattern.after(delay, system.scheduler)(Future.successful("GO!"))
|
||||
afterDelay.onComplete { it =>
|
||||
afterDelay.onComplete { _ =>
|
||||
import java.lang.management._
|
||||
val name = ManagementFactory.getRuntimeMXBean.getName
|
||||
val pid = name.substring(0, name.indexOf('@')).toInt
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue