From d8bb688ecebd5f7b5db60568f2845a475da6161c Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 24 May 2012 17:59:36 +0200 Subject: [PATCH] add comments to clarify code in NetworkFailureInjector.scala --- .../NetworkFailureInjector.scala | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala index b425518044..bf5d7d6007 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala @@ -58,7 +58,9 @@ private[akka] class FailureInjector extends Actor with ActorLogging { } /** - * Retrieve target settings, also if they were sketchy before (i.e. no system name) + * Retrieve target settings, also if they were sketchy before (i.e. no system name). + * In the latter case, copy settings from the sketchy address and remove the old + * mapping. */ def retrieveTargetSettings(target: Address): Option[ChannelSettings] = { settings get target orElse { @@ -68,12 +70,16 @@ private[akka] class FailureInjector extends Actor with ActorLogging { case (Address("akka", "", `host`, `port`), s) ⇒ true case _ ⇒ false } map { - case (_, s) ⇒ settings += target -> s; s + case (a, s) ⇒ settings -= a; settings += target -> s; s } } } def receive = { + /* + * If a channel handler tells us that he’s been disconnected, stop the + * throttle actors and forget about them (but not possibly applied settings) + */ case RemoveContext(ctx) ⇒ channels get ctx foreach { inj ⇒ context stop inj.sender @@ -81,6 +87,12 @@ private[akka] class FailureInjector extends Actor with ActorLogging { } channels -= ctx settings ++= settings collect { case (addr, c @ ChannelSettings(Some(`ctx`), _, _)) ⇒ (addr, c.copy(ctx = None)) } + /* + * Throttle/Blackhole/Unblock connections, based on the sign of rateMBit; + * will inform throttle actors for that destination if currently connected + * and update the settings for the target address; reply is needed to + * confirm this operation and tell the master that he can proceed. + */ case ThrottleMsg(target, dir, rateMBit) ⇒ val setting = retrieveTargetSettings(target) settings += target -> ((setting getOrElse ChannelSettings() match { @@ -95,6 +107,10 @@ private[akka] class FailureInjector extends Actor with ActorLogging { case x ⇒ x }) sender ! "ok" + /* + * Disconnect the currently active connection to the given target; reply is + * needed to confirm this operation and tell the master the he can proceed. + */ case DisconnectMsg(target, abort) ⇒ retrieveTargetSettings(target) foreach { case ChannelSettings(Some(ctx), _, _) ⇒ @@ -107,22 +123,39 @@ private[akka] class FailureInjector extends Actor with ActorLogging { case _ ⇒ log.debug("no connection to {} to close or abort", target) } sender ! "ok" + /* + * All data transfers up or down the pipeline are redirected through this + * case statement, which dispatches to the throttle actors for the given + * channel handler context. If none exist yet, they will be created, and + * this is a bit complicated in the case where the first message has not + * yet been exchanged, i.e. the other side’s Address is not yet known + * (keep in mind that an actor system’s remote address is not necessarily + * connected in any way to the IP from which we receive the connection). + */ case s @ Send(ctx, direction, future, msg) ⇒ channels get ctx match { case Some(Injectors(snd, rcv)) ⇒ if (direction includes Direction.Send) snd ! s if (direction includes Direction.Receive) rcv ! s case None ⇒ + // don’t do reverse lookup at first val (ipaddr, ip, port) = ctx.getChannel.getRemoteAddress match { case s: InetSocketAddress ⇒ (s.getAddress, s.getAddress.getHostAddress, s.getPort) } val addr = ChannelAddress.get(ctx.getChannel) orElse { settings collect { case (a @ Address("akka", _, Some(`ip`), Some(`port`)), _) ⇒ a } headOption } orElse { + // only if raw IP failed, try with hostname val name = ipaddr.getHostName if (name == ip) None else settings collect { case (a @ Address("akka", _, Some(`name`), Some(`port`)), _) ⇒ a } headOption - } getOrElse Address("akka", "", ip, port) // this will not match later requests directly, but be picked up by retrieveTargetSettings + } getOrElse Address("akka", "", ip, port) + /* + * ^- the above last resort will not match later requests directly, but be + * picked up by retrieveTargetSettings, so that throttle ops are + * applied to the right throttle actors, assuming that there can + * be only one actor system per host:port. + */ val inj = ingestContextAddress(ctx, addr) if (direction includes Direction.Send) inj.sender ! s if (direction includes Direction.Receive) inj.receiver ! s @@ -276,6 +309,16 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) case _ ⇒ } + /** + * Core of the throttling engine: delay Send operations until their bit count + * would actually have had time to travel down the line at the configured + * data rate, and split up send operations which are so big that gaps larger + * than packetSplitThreshold would be planned (they will happen nevertheless + * due to HashedWheelTimer’s semantics, but we compensate by sending more the + * next time, in proportion to how long the Tick was overdue). So, this should + * lead to the correct rate on average, with increased latency of the order of + * HWT granularity. + */ private def schedule(d: Data): (Data, Seq[Send], Option[Duration]) = { val now = System.nanoTime @tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[Duration]) = { @@ -297,6 +340,13 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext) rec(d, Seq()) } + /** + * Split one Send operation in two, cutting off the given number of bytes at + * the front. If it was Direction.Send, i.e. a channel.write(), then also + * split the Future so that a failure in either part will complete the original + * with that failure. Data are not copied, as long as ChannelBuffer.slice does + * not copy them. + */ private def split(s: Send, bytes: Int): (Send, Send) = { s.msg match { case buf: ChannelBuffer ⇒