add comments to clarify code in NetworkFailureInjector.scala

This commit is contained in:
Roland 2012-05-24 17:59:36 +02:00
parent 037caf5136
commit d8bb688ece

View file

@ -58,7 +58,9 @@ private[akka] class FailureInjector extends Actor with ActorLogging {
} }
/** /**
* Retrieve target settings, also if they were sketchy before (i.e. no system name) * Retrieve target settings, also if they were sketchy before (i.e. no system name).
* In the latter case, copy settings from the sketchy address and remove the old
* mapping.
*/ */
def retrieveTargetSettings(target: Address): Option[ChannelSettings] = { def retrieveTargetSettings(target: Address): Option[ChannelSettings] = {
settings get target orElse { settings get target orElse {
@ -68,12 +70,16 @@ private[akka] class FailureInjector extends Actor with ActorLogging {
case (Address("akka", "", `host`, `port`), s) true case (Address("akka", "", `host`, `port`), s) true
case _ false case _ false
} map { } map {
case (_, s) settings += target -> s; s case (a, s) settings -= a; settings += target -> s; s
} }
} }
} }
def receive = { def receive = {
/*
* If a channel handler tells us that hes been disconnected, stop the
* throttle actors and forget about them (but not possibly applied settings)
*/
case RemoveContext(ctx) case RemoveContext(ctx)
channels get ctx foreach { inj channels get ctx foreach { inj
context stop inj.sender context stop inj.sender
@ -81,6 +87,12 @@ private[akka] class FailureInjector extends Actor with ActorLogging {
} }
channels -= ctx channels -= ctx
settings ++= settings collect { case (addr, c @ ChannelSettings(Some(`ctx`), _, _)) (addr, c.copy(ctx = None)) } settings ++= settings collect { case (addr, c @ ChannelSettings(Some(`ctx`), _, _)) (addr, c.copy(ctx = None)) }
/*
* Throttle/Blackhole/Unblock connections, based on the sign of rateMBit;
* will inform throttle actors for that destination if currently connected
* and update the settings for the target address; reply is needed to
* confirm this operation and tell the master that he can proceed.
*/
case ThrottleMsg(target, dir, rateMBit) case ThrottleMsg(target, dir, rateMBit)
val setting = retrieveTargetSettings(target) val setting = retrieveTargetSettings(target)
settings += target -> ((setting getOrElse ChannelSettings() match { settings += target -> ((setting getOrElse ChannelSettings() match {
@ -95,6 +107,10 @@ private[akka] class FailureInjector extends Actor with ActorLogging {
case x x case x x
}) })
sender ! "ok" sender ! "ok"
/*
* Disconnect the currently active connection to the given target; reply is
* needed to confirm this operation and tell the master the he can proceed.
*/
case DisconnectMsg(target, abort) case DisconnectMsg(target, abort)
retrieveTargetSettings(target) foreach { retrieveTargetSettings(target) foreach {
case ChannelSettings(Some(ctx), _, _) case ChannelSettings(Some(ctx), _, _)
@ -107,22 +123,39 @@ private[akka] class FailureInjector extends Actor with ActorLogging {
case _ log.debug("no connection to {} to close or abort", target) case _ log.debug("no connection to {} to close or abort", target)
} }
sender ! "ok" sender ! "ok"
/*
* All data transfers up or down the pipeline are redirected through this
* case statement, which dispatches to the throttle actors for the given
* channel handler context. If none exist yet, they will be created, and
* this is a bit complicated in the case where the first message has not
* yet been exchanged, i.e. the other sides Address is not yet known
* (keep in mind that an actor systems remote address is not necessarily
* connected in any way to the IP from which we receive the connection).
*/
case s @ Send(ctx, direction, future, msg) case s @ Send(ctx, direction, future, msg)
channels get ctx match { channels get ctx match {
case Some(Injectors(snd, rcv)) case Some(Injectors(snd, rcv))
if (direction includes Direction.Send) snd ! s if (direction includes Direction.Send) snd ! s
if (direction includes Direction.Receive) rcv ! s if (direction includes Direction.Receive) rcv ! s
case None case None
// dont do reverse lookup at first
val (ipaddr, ip, port) = ctx.getChannel.getRemoteAddress match { val (ipaddr, ip, port) = ctx.getChannel.getRemoteAddress match {
case s: InetSocketAddress (s.getAddress, s.getAddress.getHostAddress, s.getPort) case s: InetSocketAddress (s.getAddress, s.getAddress.getHostAddress, s.getPort)
} }
val addr = ChannelAddress.get(ctx.getChannel) orElse { val addr = ChannelAddress.get(ctx.getChannel) orElse {
settings collect { case (a @ Address("akka", _, Some(`ip`), Some(`port`)), _) a } headOption settings collect { case (a @ Address("akka", _, Some(`ip`), Some(`port`)), _) a } headOption
} orElse { } orElse {
// only if raw IP failed, try with hostname
val name = ipaddr.getHostName val name = ipaddr.getHostName
if (name == ip) None if (name == ip) None
else settings collect { case (a @ Address("akka", _, Some(`name`), Some(`port`)), _) a } headOption else settings collect { case (a @ Address("akka", _, Some(`name`), Some(`port`)), _) a } headOption
} getOrElse Address("akka", "", ip, port) // this will not match later requests directly, but be picked up by retrieveTargetSettings } getOrElse Address("akka", "", ip, port)
/*
* ^- the above last resort will not match later requests directly, but be
* picked up by retrieveTargetSettings, so that throttle ops are
* applied to the right throttle actors, assuming that there can
* be only one actor system per host:port.
*/
val inj = ingestContextAddress(ctx, addr) val inj = ingestContextAddress(ctx, addr)
if (direction includes Direction.Send) inj.sender ! s if (direction includes Direction.Send) inj.sender ! s
if (direction includes Direction.Receive) inj.receiver ! s if (direction includes Direction.Receive) inj.receiver ! s
@ -276,6 +309,16 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext)
case _ case _
} }
/**
* Core of the throttling engine: delay Send operations until their bit count
* would actually have had time to travel down the line at the configured
* data rate, and split up send operations which are so big that gaps larger
* than packetSplitThreshold would be planned (they will happen nevertheless
* due to HashedWheelTimers semantics, but we compensate by sending more the
* next time, in proportion to how long the Tick was overdue). So, this should
* lead to the correct rate on average, with increased latency of the order of
* HWT granularity.
*/
private def schedule(d: Data): (Data, Seq[Send], Option[Duration]) = { private def schedule(d: Data): (Data, Seq[Send], Option[Duration]) = {
val now = System.nanoTime val now = System.nanoTime
@tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[Duration]) = { @tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[Duration]) = {
@ -297,6 +340,13 @@ private[akka] class ThrottleActor(channelContext: ChannelHandlerContext)
rec(d, Seq()) rec(d, Seq())
} }
/**
* Split one Send operation in two, cutting off the given number of bytes at
* the front. If it was Direction.Send, i.e. a channel.write(), then also
* split the Future so that a failure in either part will complete the original
* with that failure. Data are not copied, as long as ChannelBuffer.slice does
* not copy them.
*/
private def split(s: Send, bytes: Int): (Send, Send) = { private def split(s: Send, bytes: Int): (Send, Send) = {
s.msg match { s.msg match {
case buf: ChannelBuffer case buf: ChannelBuffer