diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index a5d2ceb58d..156552ed1a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -59,10 +59,6 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.remote.netty.backoff-timeout = 500ms akka.remote.netty.connection-timeout = 500ms - # don't use testconductor transport in this test, especially not - # when using use-dispatcher-for-io - akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" - # Using a separate dispatcher for netty io doesn't reduce number # of needed threads # akka.remote.netty.use-dispatcher-for-io=akka.test.io-dispatcher diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index a7a7c6f4ba..95b362e6b3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -9,10 +9,11 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ +import scala.concurrent.duration._ import akka.actor.Address -import akka.remote.testconductor.Direction import scala.concurrent.duration._ import scala.collection.immutable +import akka.remote.transport.ThrottlerTransportAdapter.Direction case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -23,6 +24,7 @@ case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends Mul commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" + akka.remoting.retry-latch-closed-for = 3 s akka.cluster { auto-down = on failure-detector.threshold = 4 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 395d803865..23c0b5009d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -10,10 +10,12 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ +import com.typesafe.config.ConfigFactory import akka.actor.Address -import akka.remote.testconductor.{ RoleName, Direction } +import akka.remote.testconductor.RoleName import scala.concurrent.duration._ import scala.collection.immutable +import akka.remote.transport.ThrottlerTransportAdapter.Direction case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") @@ -21,7 +23,12 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(ConfigFactory.parseString( + """ + akka.remoting.log-remote-lifecycle-events = off + akka.cluster.publish-stats-interval = 0s + akka.loglevel = INFO + """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))) testTransport(on = true) } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 25837cbb71..2f3df73be6 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -24,28 +24,7 @@ import akka.util.{ Timeout } import scala.reflect.classTag import akka.ConfigurationException import akka.AkkaException - -sealed trait Direction { - def includes(other: Direction): Boolean -} - -object Direction { - case object Send extends Direction { - override def includes(other: Direction): Boolean = other match { - case Send ⇒ true - case _ ⇒ false - } - } - case object Receive extends Direction { - override def includes(other: Direction): Boolean = other match { - case Receive ⇒ true - case _ ⇒ false - } - } - case object Both extends Direction { - override def includes(other: Direction): Boolean = true - } -} +import akka.remote.transport.ThrottlerTransportAdapter.Direction /** * The conductor is the one orchestrating the test: it governs the @@ -149,10 +128,9 @@ trait Conductor { this: TestConductorExt ⇒ controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done] } - private def requireTestConductorTranport(): Unit = - if (!transport.isInstanceOf[TestConductorTransport]) - throw new ConfigurationException("To use this feature you must activate the TestConductorTranport by " + - "specifying `testTransport(on = true)` in your MultiNodeConfig.") + private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl.")) + throw new ConfigurationException("To use this feature you must activate the failure injector adapters "+ + "(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.") /** * Switch the Netty pipeline of the remote support into pass through mode for diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala index cbe0825f35..fd2010ab93 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/DataTypes.scala @@ -14,6 +14,7 @@ import akka.actor.Address import org.jboss.netty.handler.codec.oneone.OneToOneDecoder import scala.concurrent.duration._ import akka.remote.testconductor.TestConductorProtocol.BarrierOp +import akka.remote.transport.ThrottlerTransportAdapter.Direction case class RoleName(name: String) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala index 479bc38b1d..4a054fdff4 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala @@ -74,12 +74,4 @@ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with C */ val address = transport.defaultAddress - /** - * INTERNAL API. - * - * [[akka.remote.testconductor.NetworkFailureInjector]]s register themselves here so that - * failures can be injected. - */ - private[akka] val failureInjector = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[FailureInjector], "FailureInjector") - } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala deleted file mode 100644 index 26d6f6f243..0000000000 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala +++ /dev/null @@ -1,385 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.remote.testconductor - -import language.postfixOps -import java.net.InetSocketAddress -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.duration._ -import org.jboss.netty.buffer.ChannelBuffer -import org.jboss.netty.channel.{ SimpleChannelHandler, MessageEvent, Channels, ChannelStateEvent, ChannelHandlerContext, ChannelFutureListener, ChannelFuture } -import akka.actor.{ Props, LoggingFSM, Address, ActorSystem, ActorRef, ActorLogging, Actor, FSM } -import akka.event.Logging -import akka.remote.netty.ChannelAddress - -/** - * INTERNAL API. - */ -private[akka] class FailureInjector extends Actor with ActorLogging { - import ThrottleActor._ - import NetworkFailureInjector._ - - case class ChannelSettings( - ctx: Option[ChannelHandlerContext] = None, - throttleSend: Option[SetRate] = None, - throttleReceive: Option[SetRate] = None) - case class Injectors(sender: ActorRef, receiver: ActorRef, known: Boolean) - - var channels = Map[ChannelHandlerContext, Injectors]() - var settings = Map[Address, ChannelSettings]() - var generation = Iterator from 1 - - /** - * Only for a NEW ctx, start ThrottleActors, prime them and update all maps. - */ - def ingestContextAddress(ctx: ChannelHandlerContext, addr: Address, known: Boolean, - snd: Option[ActorRef] = None, rcv: Option[ActorRef] = None): Injectors = { - lazy val gen = generation.next - val name = addr.host.get + ":" + addr.port.get - val thrSend = snd getOrElse context.actorOf(Props(new ThrottleActor(ctx)), name + "-snd" + gen) - val thrRecv = rcv getOrElse context.actorOf(Props(new ThrottleActor(ctx)), name + "-rcv" + gen) - val injectors = Injectors(thrSend, thrRecv, known) - channels += ctx -> injectors - settings += addr -> (settings get addr map { - case c @ ChannelSettings(prevCtx, ts, tr) ⇒ - ts foreach (thrSend ! _) - tr foreach (thrRecv ! _) - prevCtx match { - case Some(p) ⇒ log.warning("installing context {} instead of {} for address {}", ctx, p, addr) - case None ⇒ // okay - } - c.copy(ctx = Some(ctx)) - } getOrElse ChannelSettings(Some(ctx))) - injectors - } - - /** - * Retrieve target settings, also if they were sketchy before (i.e. no system name). - * In the latter case, copy settings from the sketchy address and remove the old - * mapping. - */ - def retrieveTargetSettings(target: Address): Option[ChannelSettings] = { - settings get target orElse { - val host = target.host - val port = target.port - settings find { - case (Address("akka", "", `host`, `port`), s) ⇒ true - case _ ⇒ false - } map { - case (a, s) ⇒ settings -= a; settings += target -> s; s - } - } - } - - def receive = { - /* - * If a channel handler tells us that he’s been disconnected, stop the - * throttle actors and forget about them (but not possibly applied settings) - */ - case RemoveContext(ctx) ⇒ - channels get ctx foreach { inj ⇒ - context stop inj.sender - context stop inj.receiver - } - channels -= ctx - settings ++= settings collect { case (addr, c @ ChannelSettings(Some(`ctx`), _, _)) ⇒ (addr, c.copy(ctx = None)) } - /* - * Throttle/Blackhole/Unblock connections, based on the sign of rateMBit; - * will inform throttle actors for that destination if currently connected - * and update the settings for the target address; reply is needed to - * confirm this operation and tell the master that he can proceed. - */ - case ThrottleMsg(target, dir, rateMBit) ⇒ - val setting = retrieveTargetSettings(target) - settings += target -> ((setting getOrElse ChannelSettings() match { - case cs @ ChannelSettings(ctx, _, _) if dir includes Direction.Send ⇒ - ctx foreach (c ⇒ channels get c foreach (_.sender ! SetRate(rateMBit))) - cs.copy(throttleSend = Some(SetRate(rateMBit))) - case x ⇒ x - }) match { - case cs @ ChannelSettings(ctx, _, _) if dir includes Direction.Receive ⇒ - ctx foreach (c ⇒ channels get c foreach (_.receiver ! SetRate(rateMBit))) - cs.copy(throttleReceive = Some(SetRate(rateMBit))) - case x ⇒ x - }) - sender ! "ok" - /* - * Disconnect the currently active connection to the given target; reply is - * needed to confirm this operation and tell the master the he can proceed. - */ - case DisconnectMsg(target, abort) ⇒ - retrieveTargetSettings(target) foreach { - case ChannelSettings(Some(ctx), _, _) ⇒ - val ch = ctx.getChannel - if (abort) { - ch.getConfig.setOption("soLinger", 0) - log.info("aborting connection {}", ch) - } else log.info("closing connection {}", ch) - ch.close - case _ ⇒ log.debug("no connection to {} to close or abort", target) - } - sender ! "ok" - /* - * All data transfers up or down the pipeline are redirected through this - * case statement, which dispatches to the throttle actors for the given - * channel handler context. If none exist yet, they will be created, and - * this is a bit complicated in the case where the first message has not - * yet been exchanged, i.e. the other side’s Address is not yet known - * (keep in mind that an actor system’s remote address is not necessarily - * connected in any way to the IP from which we receive the connection). - */ - case s @ Send(ctx, direction, future, msg) ⇒ - channels get ctx match { - case Some(Injectors(snd, rcv, known)) ⇒ - // if the system registered with an empty name then check if we know it now - if (!known) ChannelAddress.get(ctx.getChannel).foreach(addr ⇒ - ingestContextAddress(ctx, addr, true, Some(snd), Some(rcv))) - if (direction includes Direction.Send) snd ! s - if (direction includes Direction.Receive) rcv ! s - case None ⇒ - // don’t do reverse lookup at first - ctx.getChannel.getRemoteAddress match { - case sockAddr: InetSocketAddress ⇒ - val (ipaddr, ip, port) = (sockAddr.getAddress, sockAddr.getAddress.getHostAddress, sockAddr.getPort) - val (addr, known) = ChannelAddress.get(ctx.getChannel) orElse { - settings collect { case (a @ Address("akka", _, Some(`ip`), Some(`port`)), _) ⇒ a } headOption - } orElse { - // only if raw IP failed, try with hostname - val name = ipaddr.getHostName - if (name == ip) None - else settings collect { case (a @ Address("akka", _, Some(`name`), Some(`port`)), _) ⇒ a } headOption - } match { - case Some(a) ⇒ (a, true) - case None ⇒ (Address("akka", "", ip, port), false) - } - /* - * ^- the above last resort will not match later requests directly, but be - * picked up by retrieveTargetSettings, so that throttle ops are - * applied to the right throttle actors, assuming that there can - * be only one actor system per host:port. - */ - val inj = ingestContextAddress(ctx, addr, known) - if (direction includes Direction.Send) inj.sender ! s - if (direction includes Direction.Receive) inj.receiver ! s - case null ⇒ - log.debug("sending {} in direction {} when socket {} already closed, dropping", msg, direction, ctx.getChannel) - } - } - } -} - -private[akka] object NetworkFailureInjector { - case class RemoveContext(ctx: ChannelHandlerContext) -} - -/** - * Brief overview: all network traffic passes through the `sender`/`receiver` FSMs managed - * by the FailureInjector of the TestConductor extension. These can - * pass through requests immediately, drop them or throttle to a desired rate. The FSMs are - * registered in the TestConductorExt.failureInjector so that settings can be applied from - * the ClientFSMs. - * - * I found that simply forwarding events using ctx.sendUpstream/sendDownstream does not work, - * it deadlocks and gives strange errors; in the end I just trusted the Netty docs which - * recommend to prefer `Channels.write()` and `Channels.fireMessageReceived()`. - * - * INTERNAL API. - */ -private[akka] class NetworkFailureInjector(system: ActorSystem) extends SimpleChannelHandler { - import NetworkFailureInjector._ - - private val log = Logging(system, "FailureInjector") - - private val conductor = TestConductor(system) - private var announced = false - - override def channelConnected(ctx: ChannelHandlerContext, state: ChannelStateEvent) { - state.getValue match { - case a: InetSocketAddress ⇒ - val addr = Address("akka", "", a.getHostName, a.getPort) - log.debug("connected to {}", addr) - case x ⇒ throw new IllegalArgumentException("unknown address type: " + x) - } - } - - override def channelDisconnected(ctx: ChannelHandlerContext, state: ChannelStateEvent) { - log.debug("disconnected from {}", state.getChannel) - conductor.failureInjector ! RemoveContext(ctx) - } - - override def messageReceived(ctx: ChannelHandlerContext, msg: MessageEvent) { - log.debug("upstream(queued): {}", msg) - conductor.failureInjector ! ThrottleActor.Send(ctx, Direction.Receive, Option(msg.getFuture), msg.getMessage) - } - - override def writeRequested(ctx: ChannelHandlerContext, msg: MessageEvent) { - log.debug("downstream(queued): {}", msg) - conductor.failureInjector ! ThrottleActor.Send(ctx, Direction.Send, Option(msg.getFuture), msg.getMessage) - } - -} - -/** - * INTERNAL API. - */ -private[akka] object ThrottleActor { - sealed trait State - case object PassThrough extends State - case object Throttle extends State - case object Blackhole extends State - - case class Data(lastSent: Long, rateMBit: Float, queue: immutable.Queue[Send]) - - case class Send(ctx: ChannelHandlerContext, direction: Direction, future: Option[ChannelFuture], msg: AnyRef) - case class SetRate(rateMBit: Float) - case object Tick -} - -/** - * INTERNAL API. - */ -private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) - extends Actor with LoggingFSM[ThrottleActor.State, ThrottleActor.Data] { - - import ThrottleActor._ - import FSM._ - - private val packetSplitThreshold = TestConductor(context.system).Settings.PacketSplitThreshold - - startWith(PassThrough, Data(0, -1, immutable.Queue())) - - when(PassThrough) { - case Event(s @ Send(_, _, _, msg), _) ⇒ - log.debug("sending msg (PassThrough): {}", msg) - send(s) - stay - } - - when(Throttle) { - case Event(s: Send, data @ Data(_, _, immutable.Queue())) ⇒ - stay using sendThrottled(data.copy(lastSent = System.nanoTime, queue = immutable.Queue(s))) - case Event(s: Send, data) ⇒ - stay using sendThrottled(data.copy(queue = data.queue.enqueue(s))) - case Event(Tick, data) ⇒ - stay using sendThrottled(data) - } - - onTransition { - case Throttle -> PassThrough ⇒ - for (s ← stateData.queue) { - log.debug("sending msg (Transition): {}", s.msg) - send(s) - } - cancelTimer("send") - case Throttle -> Blackhole ⇒ - cancelTimer("send") - } - - when(Blackhole) { - case Event(Send(_, _, _, msg), _) ⇒ - log.debug("dropping msg {}", msg) - stay - } - - whenUnhandled { - case Event(SetRate(rate), d) ⇒ - if (rate > 0) { - goto(Throttle) using d.copy(lastSent = System.nanoTime, rateMBit = rate, queue = immutable.Queue()) - } else if (rate == 0) { - goto(Blackhole) - } else { - goto(PassThrough) - } - } - - initialize - - private def sendThrottled(d: Data): Data = { - val (data, toSend, toTick) = schedule(d) - for (s ← toSend) { - log.debug("sending msg (Tick): {}", s.msg) - send(s) - } - if (!timerActive_?("send")) - for (time ← toTick) { - log.debug("scheduling next Tick in {}", time) - setTimer("send", Tick, time, false) - } - data - } - - private def send(s: Send): Unit = s.direction match { - case Direction.Send ⇒ Channels.write(s.ctx, s.future getOrElse Channels.future(s.ctx.getChannel), s.msg) - case Direction.Receive ⇒ Channels.fireMessageReceived(s.ctx, s.msg) - case _ ⇒ - } - - /** - * Core of the throttling engine: delay Send operations until their bit count - * would actually have had time to travel down the line at the configured - * data rate, and split up send operations which are so big that gaps larger - * than packetSplitThreshold would be planned (they will happen nevertheless - * due to HashedWheelTimer’s semantics, but we compensate by sending more the - * next time, in proportion to how long the Tick was overdue). So, this should - * lead to the correct rate on average, with increased latency of the order of - * HWT granularity. - */ - private def schedule(d: Data): (Data, Seq[Send], Option[FiniteDuration]) = { - val now = System.nanoTime - @tailrec def rec(d: Data, toSend: immutable.Seq[Send]): (Data, immutable.Seq[Send], Option[FiniteDuration]) = { - if (d.queue.isEmpty) (d, toSend, None) - else { - val timeForPacket = d.lastSent + (1000 * size(d.queue.head.msg) / d.rateMBit).toLong - if (timeForPacket <= now) rec(Data(timeForPacket, d.rateMBit, d.queue.tail), toSend :+ d.queue.head) - else { - val splitThreshold = d.lastSent + packetSplitThreshold.toNanos - if (now < splitThreshold) (d, toSend, Some((timeForPacket - now).nanos min (splitThreshold - now).nanos)) - else { - val microsToSend = (now - d.lastSent) / 1000 - val (s1, s2) = split(d.queue.head, (microsToSend * d.rateMBit / 8).toInt) - (d.copy(queue = s2 +: d.queue.tail), toSend :+ s1, Some((timeForPacket - now).nanos min packetSplitThreshold)) - } - } - } - } - rec(d, Nil) - } - - /** - * Split one Send operation in two, cutting off the given number of bytes at - * the front. If it was Direction.Send, i.e. a channel.write(), then also - * split the Future so that a failure in either part will complete the original - * with that failure. Data are not copied, as long as ChannelBuffer.slice does - * not copy them. - */ - private def split(s: Send, bytes: Int): (Send, Send) = { - s.msg match { - case buf: ChannelBuffer ⇒ - val f = s.future map { f ⇒ - val newF = Channels.future(s.ctx.getChannel) - newF.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) f.cancel() - else future.getCause match { - case null ⇒ - case thr ⇒ f.setFailure(thr) - } - } - }) - newF - } - val b = buf.slice() - b.writerIndex(b.readerIndex + bytes) - buf.readerIndex(buf.readerIndex + bytes) - (Send(s.ctx, s.direction, f, b), Send(s.ctx, s.direction, s.future, buf)) - } - } - - private def size(msg: AnyRef) = msg match { - case b: ChannelBuffer ⇒ b.readableBytes() * 8 - case _ ⇒ throw new UnsupportedOperationException("NetworkFailureInjector only supports ChannelBuffer messages") - } -} - diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 95bfab1ee5..549f74895d 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -18,6 +18,7 @@ import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelH import akka.pattern.{ ask, pipe, AskTimeoutException } import akka.event.{ LoggingAdapter, Logging } import java.net.{ InetSocketAddress, ConnectException } +import akka.remote.transport.ThrottlerTransportAdapter.{ SetThrottle, TokenBucket, Blackhole, Unthrottled } /** * The Player is the client component of the @@ -213,12 +214,21 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) case t: ThrottleMsg ⇒ import settings.QueryTimeout import context.dispatcher // FIXME is this the right EC for the future below? - TestConductor().failureInjector ? t map (_ ⇒ ToServer(Done)) pipeTo self + val mode = if (t.rateMBit < 0.0f) Unthrottled + else if (t.rateMBit == 0.0f) Blackhole + else TokenBucket(500, t.rateMBit * 125000.0, 0, 0) + + val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode)) + cmdFuture onSuccess { + case b: Boolean ⇒ self ! ToServer(Done) + case _ => throw new RuntimeException("Throttle was requested from the TestConductor, but no transport "+ + "adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig") + } stay case d: DisconnectMsg ⇒ import settings.QueryTimeout import context.dispatcher // FIXME is this the right EC for the future below? - TestConductor().failureInjector ? d map (_ ⇒ ToServer(Done)) pipeTo self + // FIXME: Currently ignoring, needs support from Remoting stay case TerminateMsg(exit) ⇒ System.exit(exit) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/TestConductorTransport.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/TestConductorTransport.scala deleted file mode 100644 index f7b7943275..0000000000 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/TestConductorTransport.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.remote.testconductor - -import akka.remote.netty.NettyRemoteTransport -import akka.remote.RemoteSettings -import akka.actor.ExtendedActorSystem -import akka.remote.RemoteActorRefProvider -import org.jboss.netty.channel.ChannelHandler -import org.jboss.netty.channel.ChannelPipelineFactory - -/** - * INTERNAL API. - */ -private[akka] class TestConductorTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) - extends NettyRemoteTransport(_system, _provider) { - - override def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = - new ChannelPipelineFactory { - def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout, isClient) :+ endpoint) - } - -} \ No newline at end of file diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 645a96be89..f4e34c5a34 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -19,7 +19,6 @@ import akka.remote.RemoteActorRefProvider import akka.testkit._ import scala.concurrent.duration._ import akka.remote.testconductor.RoleName -import akka.remote.testconductor.TestConductorTransport import akka.actor.RootActorPath import akka.event.{ Logging, LoggingAdapter } @@ -64,7 +63,7 @@ abstract class MultiNodeConfig { receive = on fsm = on } - akka.remote.log-remote-lifecycle-events = on + akka.remoting.log-remote-lifecycle-events = on """) else ConfigFactory.empty @@ -100,7 +99,7 @@ abstract class MultiNodeConfig { private[testkit] def config: Config = { val transportConfig = - if (_testTransport) ConfigFactory.parseString("akka.remote.transport=" + classOf[TestConductorTransport].getName) + if (_testTransport) ConfigFactory.parseString("akka.remoting.transports.tcp.applied-adapters = [gremlin, trttl]") else ConfigFactory.empty val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: transportConfig :: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil @@ -191,8 +190,8 @@ object MultiNodeSpec { private[testkit] val nodeConfig = mapToConfig(Map( "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", - "akka.remote.netty.hostname" -> selfName, - "akka.remote.netty.port" -> selfPort)) + "akka.remoting.transports.tcp.hostname" -> selfName, + "akka.remoting.transports.tcp.port" -> selfPort)) private[testkit] val baseConfig: Config = ConfigFactory.parseString(""" akka { @@ -409,7 +408,6 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: // useful to see which jvm is running which role, used by LogRoleReplace utility log.info("Role [{}] started with address [{}]", myself.name, - //FIXME: Workaround for old-remoting -- must be removed later system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index b315e5c5d0..cf7a4cb5bb 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -24,7 +24,7 @@ object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { } commonConfig(debugConfig(on = false).withFallback( - ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off"))) + ConfigFactory.parseString("akka.remoting.log-remote-lifecycle-events = off"))) val master = role("master") val slave = role("slave") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index 544ee03ead..f154770105 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -16,6 +16,7 @@ import akka.testkit.LongRunningTest import java.net.InetSocketAddress import java.net.InetAddress import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec, MultiNodeConfig } +import akka.remote.transport.ThrottlerTransportAdapter.Direction object TestConductorMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false)) diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index 103d16089d..7bcbe0a1d1 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -18,7 +18,6 @@ object BarrierSpec { val config = """ akka.testconductor.barrier-timeout = 5s akka.actor.provider = akka.remote.RemoteActorRefProvider - akka.remote.netty.port = 0 akka.actor.debug.fsm = on akka.actor.debug.lifecycle = on """ diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala index 13140adfb5..ee8dc2b4e6 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala @@ -15,7 +15,6 @@ object ControllerSpec { val config = """ akka.testconductor.barrier-timeout = 5s akka.actor.provider = akka.remote.RemoteActorRefProvider - akka.remote.netty.port = 0 akka.actor.debug.fsm = on akka.actor.debug.lifecycle = on """